; rel="previous"',
+ }})
+
+ @istest
+ def add_link_header_nothing_changed(self):
+ rv = {}
+ options = {}
+
+ # when
+ _rv, _options = renderers.SWHAddLinkHeaderEnricher.add_link_header(
+ rv, options)
+
+ self.assertEquals(_rv, {})
+ self.assertEquals(_options, {})
+
+ @istest
+ def add_link_header_nothing_changed_2(self):
+ rv = {'headers': {}}
+ options = {}
+
+ # when
+ _rv, _options = renderers.SWHAddLinkHeaderEnricher.add_link_header(
+ rv, options)
+
+ self.assertEquals(_rv, {'headers': {}})
+ self.assertEquals(_options, {})
+
+
class RendererTestCase(unittest.TestCase):
@patch('swh.web.ui.renderers.g')
@patch('swh.web.ui.renderers.json')
@patch('swh.web.ui.renderers.request')
@patch('swh.web.ui.renderers.render_template')
@patch('swh.web.ui.renderers.SWHMultiResponse.filter_by_fields')
@istest
- def swh_multi_response_mimetype_html(self, mock_filter, mock_render,
- mock_request, mock_json, mock_g):
+ def swh_multi_response_mimetype_html(self, mock_filter,
+ mock_render, mock_request, mock_json,
+ mock_g):
# given
- data = {'data': [12, 34],
- 'id': 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'}
+ data = {
+ 'data': [12, 34],
+ 'id': 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'
+ }
mock_g.get.return_value = {'my_key': 'my_display_value'}
+ # mock_enricher.return_value = (data, {})
mock_filter.return_value = data
expected_env = {
'my_key': 'my_display_value',
'response_data': json.dumps(data),
'request': mock_request
}
def mock_mimetypes(key):
mimetypes = {
'text/html': 10,
'application/json': 0.1,
'application/yaml': 0.1
}
return mimetypes[key]
accept_mimetypes = MagicMock()
accept_mimetypes.__getitem__.side_effect = mock_mimetypes
accept_mimetypes.best_match = MagicMock(return_value='text/html')
mock_request.accept_mimetypes = accept_mimetypes
mock_json.dumps.return_value = json.dumps(data)
# when
rv = renderers.SWHMultiResponse.make_response_from_mimetype(data)
# then
- mock_filter.assert_called_once_with(renderers.SWHMultiResponse, data)
+ # mock_enricher.assert_called_once_with(data, {})
+ mock_filter.assert_called_once_with(data)
mock_render.assert_called_with('apidoc.html', **expected_env)
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv.mimetype, 'text/html')
@patch('swh.web.ui.renderers.g')
@patch('swh.web.ui.renderers.yaml')
@patch('swh.web.ui.renderers.request')
@patch('swh.web.ui.renderers.SWHMultiResponse.filter_by_fields')
@istest
def swh_multi_response_mimetype_yaml(self, mock_filter,
mock_request, mock_yaml, mock_g):
# given
data = {'data': [12, 34],
'id': 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'}
def mock_mimetypes(key):
mimetypes = {
'application/yaml': 10,
'application/json': 0.1,
'text/html': 0.1
}
return mimetypes[key]
accept_mimetypes = MagicMock()
accept_mimetypes.__getitem__.side_effect = mock_mimetypes
accept_mimetypes.best_match = MagicMock(
return_value='application/yaml')
mock_request.accept_mimetypes = accept_mimetypes
mock_yaml.dump.return_value = yaml.dump(data)
mock_filter.return_value = data
# when
rv = renderers.SWHMultiResponse.make_response_from_mimetype(data)
# then
- mock_filter.assert_called_once_with(renderers.SWHMultiResponse, data)
+ mock_filter.assert_called_once_with(data)
mock_yaml.dump.assert_called_once_with(data)
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv.mimetype, 'application/yaml')
self.assertEqual(data, yaml.load(rv.data.decode('utf-8')))
@patch('swh.web.ui.renderers.g')
@patch('swh.web.ui.renderers.json')
@patch('swh.web.ui.renderers.request')
@patch('swh.web.ui.renderers.SWHMultiResponse.filter_by_fields')
@istest
def swh_multi_response_mimetype_json(self, mock_filter,
mock_request, mock_json, mock_g):
# given
data = {'data': [12, 34],
'id': 'adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'}
def mock_mimetypes(key):
mimetypes = {
'application/json': 10,
'text/html': 0.1,
'application/yaml': 0.1
}
return mimetypes[key]
accept_mimetypes = MagicMock()
accept_mimetypes.__getitem__.side_effect = mock_mimetypes
accept_mimetypes.best_match = MagicMock(
return_value='application/json')
mock_request.accept_mimetypes = accept_mimetypes
mock_json.dumps.return_value = json.dumps(data)
mock_filter.return_value = data
# when
rv = renderers.SWHMultiResponse.make_response_from_mimetype(data)
# then
- mock_filter.assert_called_once_with(renderers.SWHMultiResponse, data)
+ mock_filter.assert_called_once_with(data)
mock_json.dumps.assert_called_once_with(data)
self.assertEqual(rv.status_code, 200)
self.assertEqual(rv.mimetype, 'application/json')
self.assertEqual(data, json.loads(rv.data.decode('utf-8')))
@patch('swh.web.ui.renderers.request')
@istest
def swh_multi_response_make_response_not_list_dict(self, mock_request):
# given
incoming = Response()
# when
rv = renderers.SWHMultiResponse.make_response_from_mimetype(incoming)
# then
self.assertEqual(rv, incoming)
@patch('swh.web.ui.renderers.request')
@istest
def swh_filter_renderer_do_nothing(self, mock_request):
# given
mock_request.args = {}
swh_filter_renderer = renderers.SWHFilterEnricher()
input_data = {'a': 'some-data'}
# when
actual_data = swh_filter_renderer.filter_by_fields(input_data)
# then
self.assertEquals(actual_data, input_data)
@patch('swh.web.ui.renderers.utils')
@patch('swh.web.ui.renderers.request')
@istest
def swh_filter_renderer_do_filter(self, mock_request, mock_utils):
# given
mock_request.args = {'fields': 'a,c'}
mock_utils.filter_field_keys.return_value = {'a': 'some-data'}
swh_filter_user = renderers.SWHMultiResponse()
input_data = {'a': 'some-data',
'b': 'some-other-data'}
# when
actual_data = swh_filter_user.filter_by_fields(input_data)
# then
self.assertEquals(actual_data, {'a': 'some-data'})
mock_utils.filter_field_keys.assert_called_once_with(input_data,
{'a', 'c'})
@istest
def urlize_api_links(self):
# update api link with html links content with links
content = '{"url": "/api/1/abc/"}'
expected_content = '{"url": "/api/1/abc/"}'
self.assertEquals(renderers.urlize_api_links(content),
expected_content)
# update /browse link with html links content with links
content = '{"url": "/browse/def/"}'
expected_content = '{"url": "' \
'/browse/def/"}'
self.assertEquals(renderers.urlize_api_links(content),
expected_content)
# will do nothing since it's not an api url
other_content = '{"url": "/something/api/1/other"}'
self.assertEquals(renderers.urlize_api_links(other_content),
other_content)
@istest
def revision_id_from_url(self):
url = ('/browse/revision/9ba4bcb645898d562498ea66a0df958ef0e7a68c/'
'prev/9ba4bcb645898d562498ea66a0df958ef0e7aaaa/')
expected_id = '9ba4bcb645898d562498ea66a0df958ef0e7a68c'
self.assertEqual(renderers.revision_id_from_url(url), expected_id)
@istest
def safe_docstring_display(self):
# update api link with html links content with links
docstring = """This is my list header:
- Here is item 1, with a continuation
line right here
- Here is item 2
Here is something that is not part of the list"""
expected_docstring = """This is my list header:
- Here is item 1, with a continuation
line right here
- Here is item 2
Here is something that is not part of the list
"""
self.assertEquals(renderers.safe_docstring_display(docstring),
expected_docstring)
diff --git a/swh/web/ui/tests/test_utils.py b/swh/web/ui/tests/test_utils.py
index 49c13bc3..d7f4f545 100644
--- a/swh/web/ui/tests/test_utils.py
+++ b/swh/web/ui/tests/test_utils.py
@@ -1,853 +1,879 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import dateutil
import unittest
from unittest.mock import patch, call
from nose.tools import istest, nottest
from swh.web.ui import utils
class UtilsTestCase(unittest.TestCase):
def setUp(self):
self.url_map = [dict(rule='/other/',
methods=set(['GET', 'POST', 'HEAD']),
endpoint='foo'),
dict(rule='/some/old/url/',
methods=set(['GET', 'POST']),
endpoint='blablafn'),
dict(rule='/other/old/url/',
methods=set(['GET', 'HEAD']),
endpoint='bar'),
dict(rule='/other',
methods=set([]),
endpoint=None),
dict(rule='/other2',
methods=set([]),
endpoint=None)]
@istest
def filter_endpoints_1(self):
# when
actual_data = utils.filter_endpoints(self.url_map, '/some')
# then
self.assertEquals(actual_data, {
'/some/old/url/': {
'methods': ['GET', 'POST'],
'endpoint': 'blablafn'
}
})
@istest
def filter_endpoints_2(self):
# when
actual_data = utils.filter_endpoints(self.url_map, '/other',
blacklist=['/other2'])
# then
# rules /other is skipped because its' exactly the prefix url
# rules /other2 is skipped because it's blacklisted
self.assertEquals(actual_data, {
'/other/': {
'methods': ['GET', 'HEAD', 'POST'],
'endpoint': 'foo'
},
'/other/old/url/': {
'methods': ['GET', 'HEAD'],
'endpoint': 'bar'
}
})
@istest
def prepare_data_for_view_default_encoding(self):
self.maxDiff = None
# given
inputs = [
{
'data': b'some blah data'
},
{
'data': 1,
'data_url': '/api/1/some/api/call',
},
{
'blah': 'foobar',
'blah_url': '/some/non/changed/api/call'
}]
# when
actual_result = utils.prepare_data_for_view(inputs)
# then
self.assertEquals(actual_result, [
{
'data': 'some blah data',
},
{
'data': 1,
'data_url': '/browse/some/api/call',
},
{
'blah': 'foobar',
'blah_url': '/some/non/changed/api/call'
}
])
@istest
def prepare_data_for_view(self):
self.maxDiff = None
# given
inputs = [
{
'data': b'some blah data'
},
{
'data': 1,
'data_url': '/api/1/some/api/call',
},
{
'blah': 'foobar',
'blah_url': '/some/non/changed/api/call'
}]
# when
actual_result = utils.prepare_data_for_view(inputs, encoding='ascii')
# then
self.assertEquals(actual_result, [
{
'data': 'some blah data',
},
{
'data': 1,
'data_url': '/browse/some/api/call',
},
{
'blah': 'foobar',
'blah_url': '/some/non/changed/api/call'
}
])
@istest
def prepare_data_for_view_ko_cannot_decode(self):
self.maxDiff = None
# given
inputs = {
'data': 'hé dude!'.encode('utf8'),
}
actual_result = utils.prepare_data_for_view(inputs, encoding='ascii')
# then
self.assertEquals(actual_result, {
'data': "Cannot decode the data bytes, try and set another "
"encoding in the url (e.g. ?encoding=utf8) or "
"download directly the "
"content's raw data.",
})
@istest
def filter_field_keys_dict_unknown_keys(self):
# when
actual_res = utils.filter_field_keys(
{'directory': 1, 'file': 2, 'link': 3},
{'directory1', 'file2'})
# then
self.assertEqual(actual_res, {})
@istest
def filter_field_keys_dict(self):
# when
actual_res = utils.filter_field_keys(
{'directory': 1, 'file': 2, 'link': 3},
{'directory', 'link'})
# then
self.assertEqual(actual_res, {'directory': 1, 'link': 3})
@istest
def filter_field_keys_list_unknown_keys(self):
# when
actual_res = utils.filter_field_keys(
[{'directory': 1, 'file': 2, 'link': 3},
{'1': 1, '2': 2, 'link': 3}],
{'d'})
# then
self.assertEqual(actual_res, [{}, {}])
@istest
def filter_field_keys_list(self):
# when
actual_res = utils.filter_field_keys(
[{'directory': 1, 'file': 2, 'link': 3},
{'dir': 1, 'fil': 2, 'lin': 3}],
{'directory', 'dir'})
# then
self.assertEqual(actual_res, [{'directory': 1}, {'dir': 1}])
@istest
def filter_field_keys_other(self):
# given
input_set = {1, 2}
# when
actual_res = utils.filter_field_keys(input_set, {'a', '1'})
# then
self.assertEqual(actual_res, input_set)
@istest
def fmap(self):
self.assertEquals([2, 3, None, 4],
utils.fmap(lambda x: x+1, [1, 2, None, 3]))
self.assertEquals([11, 12, 13],
list(utils.fmap(lambda x: x+10,
map(lambda x: x, [1, 2, 3]))))
self.assertEquals({'a': 2, 'b': 4},
utils.fmap(lambda x: x*2, {'a': 1, 'b': 2}))
self.assertEquals(100,
utils.fmap(lambda x: x*10, 10))
self.assertEquals({'a': [2, 6], 'b': 4},
utils.fmap(lambda x: x*2, {'a': [1, 3], 'b': 2}))
self.assertIsNone(utils.fmap(lambda x: x, None))
@istest
def person_to_string(self):
self.assertEqual(utils.person_to_string(dict(name='raboof',
email='foo@bar')),
'raboof ')
@istest
def parse_timestamp(self):
input_timestamps = [
'2016-01-12',
'2016-01-12T09:19:12+0100',
'Today is January 1, 2047 at 8:21:00AM',
'1452591542',
]
output_dates = [
datetime.datetime(2016, 1, 12, 0, 0, tzinfo=datetime.timezone.utc),
datetime.datetime(2016, 1, 12, 9, 19, 12,
tzinfo=dateutil.tz.tzoffset(None, 3600)),
datetime.datetime(2047, 1, 1, 8, 21, tzinfo=datetime.timezone.utc),
datetime.datetime(2016, 1, 12, 9, 39, 2,
tzinfo=datetime.timezone.utc),
]
for ts, exp_date in zip(input_timestamps, output_dates):
self.assertEquals(utils.parse_timestamp(ts), exp_date)
@istest
def enrich_release_0(self):
# when
actual_release = utils.enrich_release({})
# then
self.assertEqual(actual_release, {})
@patch('swh.web.ui.utils.flask')
@istest
def enrich_release_1(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/1/content/sha1_git:123/'
# when
actual_release = utils.enrich_release({'target': '123',
'target_type': 'content'})
# then
self.assertEqual(actual_release, {
'target': '123',
'target_type': 'content',
'target_url': '/api/1/content/sha1_git:123/'
})
mock_flask.url_for.assert_called_once_with('api_content_metadata',
q='sha1_git:123')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_release_2(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/1/dir/23/'
# when
actual_release = utils.enrich_release({'target': '23',
'target_type': 'directory'})
# then
self.assertEqual(actual_release, {
'target': '23',
'target_type': 'directory',
'target_url': '/api/1/dir/23/'
})
mock_flask.url_for.assert_called_once_with('api_directory',
q='23')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_release_3(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/1/rev/3/'
# when
actual_release = utils.enrich_release({'target': '3',
'target_type': 'revision'})
# then
self.assertEqual(actual_release, {
'target': '3',
'target_type': 'revision',
'target_url': '/api/1/rev/3/'
})
mock_flask.url_for.assert_called_once_with('api_revision',
sha1_git='3')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_release_4(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/1/rev/4/'
# when
actual_release = utils.enrich_release({'target': '4',
'target_type': 'release'})
# then
self.assertEqual(actual_release, {
'target': '4',
'target_type': 'release',
'target_url': '/api/1/rev/4/'
})
mock_flask.url_for.assert_called_once_with('api_release',
sha1_git='4')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_directory_no_type(self, mock_flask):
# when/then
self.assertEqual(utils.enrich_directory({'id': 'dir-id'}),
{'id': 'dir-id'})
# given
mock_flask.url_for.return_value = '/api/content/sha1_git:123/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'file',
'target': '123',
})
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'file',
'target': '123',
'target_url': '/api/content/sha1_git:123/',
})
mock_flask.url_for.assert_called_once_with('api_content_metadata',
q='sha1_git:123')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_directory_with_context_and_type_file(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/content/sha1_git:123/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'file',
'name': 'hy',
'target': '789',
}, context_url='/api/revision/revsha1/directory/prefix/path/')
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'file',
'name': 'hy',
'target': '789',
'target_url': '/api/content/sha1_git:123/',
'file_url': '/api/revision/revsha1/directory'
'/prefix/path/hy/'
})
mock_flask.url_for.assert_called_once_with('api_content_metadata',
q='sha1_git:789')
@patch('swh.web.ui.utils.flask')
@istest
def enrich_directory_with_context_and_type_dir(self, mock_flask):
# given
mock_flask.url_for.return_value = '/api/directory/456/'
# when
actual_directory = utils.enrich_directory({
'id': 'dir-id',
'type': 'dir',
'name': 'emacs-42',
'target_type': 'file',
'target': '456',
}, context_url='/api/revision/origin/2/directory/some/prefix/path/')
# then
self.assertEqual(actual_directory, {
'id': 'dir-id',
'type': 'dir',
'target_type': 'file',
'name': 'emacs-42',
'target': '456',
'target_url': '/api/directory/456/',
'dir_url': '/api/revision/origin/2/directory'
'/some/prefix/path/emacs-42/'
})
mock_flask.url_for.assert_called_once_with('api_directory',
sha1_git='456')
@istest
def enrich_content_without_hashes(self):
# when/then
self.assertEqual(utils.enrich_content({'id': '123'}),
{'id': '123'})
@patch('swh.web.ui.utils.flask')
@istest
def enrich_content_with_hashes(self, mock_flask):
for h in ['sha1', 'sha256', 'sha1_git']:
# given
mock_flask.url_for.side_effect = [
'/api/content/%s:123/raw/' % h,
'/api/filetype/%s:123/' % h,
'/api/language/%s:123/' % h,
'/api/license/%s:123/' % h,
]
# when
enriched_content = utils.enrich_content(
{
'id': '123',
h: 'blahblah'
}
)
# then
self.assertEqual(
enriched_content,
{
'id': '123',
h: 'blahblah',
'data_url': '/api/content/%s:123/raw/' % h,
'filetype_url': '/api/filetype/%s:123/' % h,
'language_url': '/api/language/%s:123/' % h,
'license_url': '/api/license/%s:123/' % h,
}
)
mock_flask.url_for.assert_has_calls([
call('api_content_raw', q='%s:blahblah' % h),
call('api_content_filetype', q='%s:blahblah' % h),
call('api_content_language', q='%s:blahblah' % h),
call('api_content_license', q='%s:blahblah' % h),
])
mock_flask.reset()
@istest
def enrich_entity_identity(self):
# when/then
self.assertEqual(utils.enrich_content({'id': '123'}),
{'id': '123'})
@patch('swh.web.ui.utils.flask')
@istest
def enrich_entity_with_sha1(self, mock_flask):
# given
def url_for_test(fn, **entity):
return '/api/entity/' + entity['uuid'] + '/'
mock_flask.url_for.side_effect = url_for_test
# when
actual_entity = utils.enrich_entity({
'uuid': 'uuid-1',
'parent': 'uuid-parent',
'name': 'something'
})
# then
self.assertEqual(actual_entity, {
'uuid': 'uuid-1',
'uuid_url': '/api/entity/uuid-1/',
'parent': 'uuid-parent',
'parent_url': '/api/entity/uuid-parent/',
'name': 'something',
})
mock_flask.url_for.assert_has_calls([call('api_entity_by_uuid',
uuid='uuid-1'),
call('api_entity_by_uuid',
uuid='uuid-parent')])
@nottest
def _url_for_context_test(self, fn, **data):
if fn == 'api_revision':
if 'context' in data and data['context'] is not None:
return '/api/revision/%s/prev/%s/' % (data['sha1_git'], data['context']) # noqa
else:
return '/api/revision/%s/' % data['sha1_git']
elif fn == 'api_revision_log':
if 'prev_sha1s' in data:
return '/api/revision/%s/prev/%s/log/' % (data['sha1_git'], data['prev_sha1s']) # noqa
else:
return '/api/revision/%s/log/' % data['sha1_git']
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_without_children_or_parent(self, mock_flask):
# given
def url_for_test(fn, **data):
if fn == 'api_revision':
return '/api/revision/' + data['sha1_git'] + '/'
elif fn == 'api_revision_log':
return '/api/revision/' + data['sha1_git'] + '/log/'
elif fn == 'api_directory':
return '/api/directory/' + data['sha1_git'] + '/'
elif fn == 'api_person':
return '/api/person/' + data['person_id'] + '/'
mock_flask.url_for.side_effect = url_for_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'directory': '123',
'author': {'id': '1'},
'committer': {'id': '2'},
})
expected_revision = {
'id': 'rev-id',
'directory': '123',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'directory_url': '/api/directory/123/',
'author': {'id': '1'},
'author_url': '/api/person/1/',
'committer': {'id': '2'},
'committer_url': '/api/person/2/'
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_person',
person_id='1'),
call('api_person',
person_id='2'),
call('api_directory',
sha1_git='123')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_with_children_and_parent_no_dir(self,
mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_context_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': '/api/revision/rev-id/prev/prev-rev/log/',
'parents': ['123'],
'parent_urls': ['/api/revision/123/prev/prev-rev/rev-id/'],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev-rev/'],
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev-rev'),
call('api_revision',
sha1_git='123',
context='prev-rev/rev-id'),
call('api_revision',
sha1_git='456')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_no_context(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_context_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456'],
})
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'parents': ['123'],
'parent_urls': ['/api/revision/123/prev/rev-id/'],
'children': ['456'],
'children_urls': ['/api/revision/456/']
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision',
sha1_git='123',
context='rev-id'),
call('api_revision',
sha1_git='456')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_context_empty_prev_list(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_context_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'parents': ['123'],
'parent_urls': ['/api/revision/123/prev/prev-rev/rev-id/'],
'children': ['456'],
'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'parents': ['123'],
'children': ['456']}, context='prev-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev-rev'),
call('api_revision',
sha1_git='123',
context='prev-rev/rev-id'),
call('api_revision',
sha1_git='456')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_context_some_prev_list(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_context_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev1-rev/prev0-rev/log/'),
'parents': ['123'],
'parent_urls': ['/api/revision/123/prev/'
'prev1-rev/prev0-rev/rev-id/'],
'children': ['456'],
'children_urls': ['/api/revision/456/',
'/api/revision/prev0-rev/prev/prev1-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'parents': ['123'],
'children': ['456']}, context='prev1-rev/prev0-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev0-rev',
context='prev1-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev1-rev/prev0-rev'),
call('api_revision',
sha1_git='123',
context='prev1-rev/prev0-rev/rev-id'),
call('api_revision',
sha1_git='456')])
@nottest
def _url_for_rev_message_test(self, fn, **data):
if fn == 'api_revision':
if 'context' in data and data['context'] is not None:
return '/api/revision/%s/prev/%s/' % (data['sha1_git'], data['context']) # noqa
else:
return '/api/revision/%s/' % data['sha1_git']
elif fn == 'api_revision_log':
if 'prev_sha1s' in data and data['prev_sha1s'] is not None:
return '/api/revision/%s/prev/%s/log/' % (data['sha1_git'], data['prev_sha1s']) # noqa
else:
return '/api/revision/%s/log/' % data['sha1_git']
elif fn == 'api_revision_raw_message':
return '/api/revision/' + data['sha1_git'] + '/raw/'
else:
return '/api/revision/' + data['sha1_git_root'] + '/history/' + data['sha1_git'] + '/' # noqa
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_with_no_message(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_rev_message_test
# when
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'message': None,
'parents': ['123'],
'parent_urls': ['/api/revision/123/prev/prev-rev/rev-id/'],
'children': ['456'],
'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'],
}
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'message': None,
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev-rev'),
call('api_revision',
sha1_git='123',
context='prev-rev/rev-id'),
call('api_revision',
sha1_git='456')])
@patch('swh.web.ui.utils.flask')
@istest
def enrich_revision_with_invalid_message(self, mock_flask):
# given
mock_flask.url_for.side_effect = self._url_for_rev_message_test
# when
actual_revision = utils.enrich_revision({
'id': 'rev-id',
'message': None,
'message_decoding_failed': True,
'parents': ['123'],
'children': ['456'],
}, context='prev-rev')
expected_revision = {
'id': 'rev-id',
'url': '/api/revision/rev-id/',
'history_url': '/api/revision/rev-id/log/',
'history_context_url': ('/api/revision/rev-id/'
'prev/prev-rev/log/'),
'message': None,
'message_decoding_failed': True,
'message_url': '/api/revision/rev-id/raw/',
'parents': ['123'],
'parent_urls': ['/api/revision/123/prev/prev-rev/rev-id/'],
'children': ['456'],
'children_urls': ['/api/revision/456/', '/api/revision/prev-rev/'],
}
# then
self.assertEqual(actual_revision, expected_revision)
mock_flask.url_for.assert_has_calls(
[call('api_revision',
sha1_git='prev-rev'),
call('api_revision',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id'),
call('api_revision_log',
sha1_git='rev-id',
prev_sha1s='prev-rev'),
call('api_revision',
sha1_git='123',
context='prev-rev/rev-id'),
call('api_revision',
sha1_git='456')])
+
+ @patch('swh.web.ui.utils.flask')
+ def next_page(self, mock_flask):
+ mock_flask.url_for.return_value = '/some/api/url/'
+
+ # when
+ actual_url = utils.next_page('some-function-name', q='barfoo', page=10)
+
+ # then
+ self.assertEquals(actual_url, '/some/api/url/?page=11')
+
+ mock_flask.url_for.assert_called_once_with('some-function-name',
+ q='barfoo')
+
+ @patch('swh.web.ui.utils.flask')
+ def prev_page(self, mock_flask):
+ mock_flask.url_for.return_value = '/some/api/url/'
+
+ # when
+ actual_url = utils.prev_page('some-function-name', q='barfoo', page=10)
+
+ # then
+ self.assertEquals(actual_url, '/some/api/url/?page=9')
+
+ mock_flask.url_for.assert_called_once_with('some-function-name',
+ q='barfoo')
diff --git a/swh/web/ui/tests/views/test_api.py b/swh/web/ui/tests/views/test_api.py
index e7efd98b..282109c1 100644
--- a/swh/web/ui/tests/views/test_api.py
+++ b/swh/web/ui/tests/views/test_api.py
@@ -1,2241 +1,2247 @@
# Copyright (C) 2015 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import json
import unittest
import yaml
from nose.tools import istest
from unittest.mock import patch, MagicMock
from swh.web.ui.tests import test_app
from swh.web.ui import exc
from swh.web.ui.views import api
from swh.web.ui.exc import NotFoundExc, BadInputExc
from swh.storage.exc import StorageDBError, StorageAPIError
class ApiTestCase(test_app.SWHApiTestCase):
def setUp(self):
self.origin_visit1 = {
'date': 1104616800.0,
'origin': 10,
'visit': 100,
'metadata': None,
'status': 'full',
}
self.origin1 = {
'id': 1234,
'lister': 'uuid-lister-0',
'project': 'uuid-project-0',
'url': 'ftp://some/url/to/origin/0',
'type': 'ftp'
}
@istest
def generic_api_lookup_nothing_is_found(self):
# given
def test_generic_lookup_fn(sha1, another_unused_arg):
assert another_unused_arg == 'unused arg'
assert sha1 == 'sha1'
return None
# when
with self.assertRaises(NotFoundExc) as cm:
api._api_lookup('sha1', test_generic_lookup_fn,
'This will be raised because None is returned.',
lambda x: x,
'unused arg')
self.assertIn('This will be raised because None is returned.',
cm.exception.args[0])
@istest
def generic_api_map_are_enriched_and_transformed_to_list(self):
# given
def test_generic_lookup_fn_1(criteria0, param0, param1):
assert criteria0 == 'something'
return map(lambda x: x + 1, [1, 2, 3])
# when
actual_result = api._api_lookup(
'something',
test_generic_lookup_fn_1,
'This is not the error message you are looking for. Move along.',
lambda x: x * 2,
'some param 0',
'some param 1')
self.assertEqual(actual_result, [4, 6, 8])
@istest
def generic_api_list_are_enriched_too(self):
# given
def test_generic_lookup_fn_2(crit):
assert crit == 'something'
return ['a', 'b', 'c']
# when
actual_result = api._api_lookup(
'something',
test_generic_lookup_fn_2,
'Not the error message you are looking for, it is. '
'Along, you move!',
lambda x: ''. join(['=', x, '=']))
self.assertEqual(actual_result, ['=a=', '=b=', '=c='])
@istest
def generic_api_generator_are_enriched_and_returned_as_list(self):
# given
def test_generic_lookup_fn_3(crit):
assert crit == 'crit'
return (i for i in [4, 5, 6])
# when
actual_result = api._api_lookup(
'crit',
test_generic_lookup_fn_3,
'Move!',
lambda x: x - 1)
self.assertEqual(actual_result, [3, 4, 5])
@istest
def generic_api_simple_data_are_enriched_and_returned_too(self):
# given
def test_generic_lookup_fn_4(crit):
assert crit == '123'
return {'a': 10}
def test_enrich_data(x):
x['a'] = x['a'] * 10
return x
# when
actual_result = api._api_lookup(
'123',
test_generic_lookup_fn_4,
'Nothing to do',
test_enrich_data)
self.assertEqual(actual_result, {'a': 100})
@patch('swh.web.ui.views.api.service')
@istest
def api_content_filetype(self, mock_service):
stub_filetype = {
'mimetype': 'application/xml',
'encoding': 'ascii',
'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
}
mock_service.lookup_content_filetype.return_value = stub_filetype
# when
rv = self.app.get(
'/api/1/filetype/'
'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'mimetype': 'application/xml',
'encoding': 'ascii',
'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'content_url': '/api/1/content/'
'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/',
})
mock_service.lookup_content_filetype.assert_called_once_with(
'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f')
@patch('swh.web.ui.views.api.service')
@istest
def api_content_filetype_sha_not_found(self, mock_service):
# given
mock_service.lookup_content_filetype.return_value = None
# when
rv = self.app.get(
'/api/1/filetype/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'No filetype information found for content '
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03.'
})
mock_service.lookup_content_filetype.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.views.api.service')
@istest
def api_content_language(self, mock_service):
stub_language = {
'lang': 'lisp',
'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
}
mock_service.lookup_content_language.return_value = stub_language
# when
rv = self.app.get(
'/api/1/language/'
'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'lang': 'lisp',
'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'content_url': '/api/1/content/'
'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/',
})
mock_service.lookup_content_language.assert_called_once_with(
'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f')
@patch('swh.web.ui.views.api.service')
@istest
def api_content_language_sha_not_found(self, mock_service):
# given
mock_service.lookup_content_language.return_value = None
# when
rv = self.app.get(
'/api/1/language/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'No language information found for content '
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03.'
})
mock_service.lookup_content_language.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.views.api.service')
@istest
def api_fulltext_search(self, mock_service):
stub_ctag = [{
'sha1': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'name': 'foobar',
'kind': 'Haskell',
'line': 10,
}]
mock_service.lookup_expression.return_value = stub_ctag
# when
- rv = self.app.get('/api/1/symbol/foo/')
+ rv = self.app.get('/api/1/symbol/foo/?page=2')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, [{
'sha1': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'name': 'foobar',
'kind': 'Haskell',
'line': 10,
'data_url': '/api/1/content/'
'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/',
'license_url': '/api/1/license/'
'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/',
'language_url': '/api/1/language/'
'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/',
'filetype_url': '/api/1/filetype/'
'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/',
}])
+ actual_headers = dict(rv.headers)
+ self.assertEquals(actual_headers['Link'],
+ '; rel="next",'
+ '; rel="previous"')
- mock_service.lookup_expression.assert_called_once_with('foo', 1)
+ mock_service.lookup_expression.assert_called_once_with('foo', 2)
@patch('swh.web.ui.views.api.service')
@istest
def api_fulltext_search_not_found(self, mock_service):
# given
mock_service.lookup_expression.return_value = []
# when
rv = self.app.get('/api/1/symbol/bar/?page=2')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'No indexed raw content match expression \'bar\'.'
})
+ actual_headers = dict(rv.headers)
+ self.assertFalse('Link' in actual_headers)
mock_service.lookup_expression.assert_called_once_with('bar', 2)
@patch('swh.web.ui.views.api.service')
@istest
def api_content_license(self, mock_service):
stub_license = {
'licenses': ['No_license_found', 'Apache-2.0'],
'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'tool_name': 'nomos',
}
mock_service.lookup_content_license.return_value = stub_license
# when
rv = self.app.get(
'/api/1/license/'
'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'licenses': ['No_license_found', 'Apache-2.0'],
'id': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'tool_name': 'nomos',
'content_url': '/api/1/content/'
'sha1:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/',
})
mock_service.lookup_content_license.assert_called_once_with(
'sha1_git:b04caf10e9535160d90e874b45aa426de762f19f')
@patch('swh.web.ui.views.api.service')
@istest
def api_content_license_sha_not_found(self, mock_service):
# given
mock_service.lookup_content_license.return_value = None
# when
rv = self.app.get(
'/api/1/license/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'No license information found for content '
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03.'
})
mock_service.lookup_content_license.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.views.api.service')
@istest
def api_content_provenance(self, mock_service):
stub_provenances = [{
'origin': 1,
'visit': 2,
'revision': 'b04caf10e9535160d90e874b45aa426de762f19f',
'content': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html'
}]
mock_service.lookup_content_provenance.return_value = stub_provenances
# when
rv = self.app.get(
'/api/1/provenance/'
'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, [{
'origin': 1,
'visit': 2,
'origin_url': '/api/1/origin/1/',
'origin_visits_url': '/api/1/origin/1/visits/',
'origin_visit_url': '/api/1/origin/1/visits/2/',
'revision': 'b04caf10e9535160d90e874b45aa426de762f19f',
'revision_url': '/api/1/revision/'
'b04caf10e9535160d90e874b45aa426de762f19f/',
'content': '34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'content_url': '/api/1/content/'
'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03/',
'path': 'octavio-3.4.0/octave.html/doc_002dS_005fISREG.html'
}])
mock_service.lookup_content_provenance.assert_called_once_with(
'sha1_git:34571b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.views.api.service')
@istest
def api_content_provenance_sha_not_found(self, mock_service):
# given
mock_service.lookup_content_provenance.return_value = None
# when
rv = self.app.get(
'/api/1/provenance/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6'
'6c5b00a6d03 not found.'
})
mock_service.lookup_content_provenance.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.views.api.service')
@istest
def api_content_metadata(self, mock_service):
# given
mock_service.lookup_content.return_value = {
'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882',
'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560'
'cde9b067a4f',
'length': 17,
'status': 'visible'
}
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'data_url': '/api/1/content/'
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/raw/',
'filetype_url': '/api/1/filetype/'
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/',
'language_url': '/api/1/language/'
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/',
'license_url': '/api/1/license/'
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03/',
'sha1': '40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03',
'sha1_git': 'b4e8f472ffcb01a03875b26e462eb568739f6882',
'sha256': '83c0e67cc80f60caf1fcbec2d84b0ccd7968b3be4735637006560c'
'de9b067a4f',
'length': 17,
'status': 'visible'
})
mock_service.lookup_content.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.views.api.service')
@istest
def api_content_not_found_as_json(self, mock_service):
# given
mock_service.lookup_content.return_value = None
mock_service.lookup_content_provenance = MagicMock()
# when
rv = self.app.get(
'/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79'
'68b3be4735637006560c not found.'
})
mock_service.lookup_content.assert_called_once_with(
'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c')
mock_service.lookup_content_provenance.called = False
@patch('swh.web.ui.views.api.service')
@istest
def api_content_not_found_as_yaml(self, mock_service):
# given
mock_service.lookup_content.return_value = None
mock_service.lookup_content_provenance = MagicMock()
# when
rv = self.app.get(
'/api/1/content/sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c/',
headers={'accept': 'application/yaml'})
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/yaml')
response_data = yaml.load(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha256:83c0e67cc80f60caf1fcbec2d84b0ccd79'
'68b3be4735637006560c not found.'
})
mock_service.lookup_content.assert_called_once_with(
'sha256:83c0e67cc80f60caf1fcbec2d84b0ccd7968b3'
'be4735637006560c')
mock_service.lookup_content_provenance.called = False
@patch('swh.web.ui.views.api.service')
@istest
def api_content_raw_ko_not_found(self, mock_service):
# given
mock_service.lookup_content_raw.return_value = None
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
'/raw/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Content with sha1:40e71b8614fcd89ccd17ca2b1d9e6'
'6c5b00a6d03 not found.'
})
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.views.api.service')
@istest
def api_content_raw(self, mock_service):
# given
stub_content = {'data': b'some content data'}
mock_service.lookup_content_raw.return_value = stub_content
# when
rv = self.app.get(
'/api/1/content/sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03'
'/raw/',
headers={'Content-type': 'application/octet-stream',
'Content-disposition': 'attachment'})
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/octet-stream')
self.assertEquals(rv.data, stub_content['data'])
mock_service.lookup_content_raw.assert_called_once_with(
'sha1:40e71b8614fcd89ccd17ca2b1d9e66c5b00a6d03')
@patch('swh.web.ui.views.api.service')
@istest
def api_search(self, mock_service):
# given
mock_service.search_hash.return_value = {'found': True}
expected_result = {
'search_stats': {'nbfiles': 1, 'pct': 100},
'search_res': [{'filename': None,
'sha1': 'sha1:blah',
'found': True}]
}
# when
rv = self.app.get('/api/1/content/search/sha1:blah/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_result)
mock_service.search_hash.assert_called_once_with('sha1:blah')
@patch('swh.web.ui.views.api.service')
@istest
def api_search_as_yaml(self, mock_service):
# given
mock_service.search_hash.return_value = {'found': True}
expected_result = {
'search_stats': {'nbfiles': 1, 'pct': 100},
'search_res': [{'filename': None,
'sha1': 'sha1:halb',
'found': True}]
}
# when
rv = self.app.get('/api/1/content/search/sha1:halb/',
headers={'Accept': 'application/yaml'})
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/yaml')
response_data = yaml.load(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_result)
mock_service.search_hash.assert_called_once_with('sha1:halb')
@patch('swh.web.ui.views.api.service')
@istest
def api_search_not_found(self, mock_service):
# given
mock_service.search_hash.return_value = {'found': False}
expected_result = {
'search_stats': {'nbfiles': 1, 'pct': 0},
'search_res': [{'filename': None,
'sha1': 'sha1:halb',
'found': False}]
}
# when
rv = self.app.get('/api/1/content/search/sha1:halb/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_result)
mock_service.search_hash.assert_called_once_with('sha1:halb')
@patch('swh.web.ui.views.api.service')
@istest
def api_1_stat_counters_raise_error(self, mock_service):
# given
mock_service.stat_counters.side_effect = ValueError(
'voluntary error to check the bad request middleware.')
# when
rv = self.app.get('/api/1/stat/counters/')
# then
self.assertEquals(rv.status_code, 400)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'voluntary error to check the bad request middleware.'})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_stat_counters_raise_swh_storage_error_db(self, mock_service):
# given
mock_service.stat_counters.side_effect = StorageDBError(
'SWH Storage exploded! Will be back online shortly!')
# when
rv = self.app.get('/api/1/stat/counters/')
# then
self.assertEquals(rv.status_code, 503)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error':
'An unexpected error occurred in the backend: '
'SWH Storage exploded! Will be back online shortly!'})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_stat_counters_raise_swh_storage_error_api(self, mock_service):
# given
mock_service.stat_counters.side_effect = StorageAPIError(
'SWH Storage API dropped dead! Will resurrect from its ashes asap!'
)
# when
rv = self.app.get('/api/1/stat/counters/')
# then
self.assertEquals(rv.status_code, 503)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error':
'An unexpected error occurred in the api backend: '
'SWH Storage API dropped dead! Will resurrect from its ashes asap!'
})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_stat_counters(self, mock_service):
# given
stub_stats = {
"content": 1770830,
"directory": 211683,
"directory_entry_dir": 209167,
"directory_entry_file": 1807094,
"directory_entry_rev": 0,
"entity": 0,
"entity_history": 0,
"occurrence": 0,
"occurrence_history": 19600,
"origin": 1096,
"person": 0,
"release": 8584,
"revision": 7792,
"revision_history": 0,
"skipped_content": 0
}
mock_service.stat_counters.return_value = stub_stats
# when
rv = self.app.get('/api/1/stat/counters/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_stats)
mock_service.stat_counters.assert_called_once_with()
@patch('swh.web.ui.views.api.service')
@istest
def api_1_lookup_origin_visits_raise_error(self, mock_service):
# given
mock_service.lookup_origin_visits.side_effect = ValueError(
'voluntary error to check the bad request middleware.')
# when
rv = self.app.get('/api/1/origin/2/visits/')
# then
self.assertEquals(rv.status_code, 400)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'voluntary error to check the bad request middleware.'})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_lookup_origin_visits_raise_swh_storage_error_db(
self, mock_service):
# given
mock_service.lookup_origin_visits.side_effect = StorageDBError(
'SWH Storage exploded! Will be back online shortly!')
# when
rv = self.app.get('/api/1/origin/2/visits/')
# then
self.assertEquals(rv.status_code, 503)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error':
'An unexpected error occurred in the backend: '
'SWH Storage exploded! Will be back online shortly!'})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_lookup_origin_visits_raise_swh_storage_error_api(
self, mock_service):
# given
mock_service.lookup_origin_visits.side_effect = StorageAPIError(
'SWH Storage API dropped dead! Will resurrect from its ashes asap!'
)
# when
rv = self.app.get('/api/1/origin/2/visits/')
# then
self.assertEquals(rv.status_code, 503)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error':
'An unexpected error occurred in the api backend: '
'SWH Storage API dropped dead! Will resurrect from its ashes asap!'
})
@patch('swh.web.ui.views.api.service')
@istest
def api_1_lookup_origin_visits(self, mock_service):
# given
stub_visits = [
{
'date': 1104616800.0,
'origin': 1,
'visit': 1
},
{
'date': 1293919200.0,
'origin': 1,
'visit': 2
},
{
'date': 1420149600.0,
'origin': 1,
'visit': 3
}
]
mock_service.lookup_origin_visits.return_value = stub_visits
# when
rv = self.app.get('/api/1/origin/2/visits/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, [
{
'date': 1104616800.0,
'origin': 1,
'visit': 1,
'origin_visit_url': '/api/1/origin/1/visits/1/',
},
{
'date': 1293919200.0,
'origin': 1,
'visit': 2,
'origin_visit_url': '/api/1/origin/1/visits/2/',
},
{
'date': 1420149600.0,
'origin': 1,
'visit': 3,
'origin_visit_url': '/api/1/origin/1/visits/3/',
}
])
mock_service.lookup_origin_visits.assert_called_once_with(2)
@patch('swh.web.ui.views.api.service')
@istest
def api_1_lookup_origin_visit(self, mock_service):
# given
origin_visit = self.origin_visit1.copy()
origin_visit.update({
'occurrences': {
'master': {
'target_type': 'revision',
'target': 'revision-id',
}
}
})
mock_service.lookup_origin_visit.return_value = origin_visit
expected_origin_visit = self.origin_visit1.copy()
expected_origin_visit.update({
'origin_url': '/api/1/origin/10/',
'occurrences': {
'master': {
'target_type': 'revision',
'target': 'revision-id',
'target_url': '/api/1/revision/revision-id/'
}
}
})
# when
rv = self.app.get('/api/1/origin/10/visits/100/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_origin_visit)
mock_service.lookup_origin_visit.assert_called_once_with(10, 100)
@patch('swh.web.ui.views.api.service')
@istest
def api_1_lookup_origin_visit_not_found(self, mock_service):
# given
mock_service.lookup_origin_visit.return_value = None
# when
rv = self.app.get('/api/1/origin/1/visits/1000/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'No visit 1000 for origin 1 found'
})
mock_service.lookup_origin_visit.assert_called_once_with(1, 1000)
@patch('swh.web.ui.views.api.service')
@istest
def api_origin_by_id(self, mock_service):
# given
mock_service.lookup_origin.return_value = self.origin1
expected_origin = self.origin1.copy()
expected_origin.update({
'origin_visits_url': '/api/1/origin/1234/visits/'
})
# when
rv = self.app.get('/api/1/origin/1234/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_origin)
mock_service.lookup_origin.assert_called_with({'id': 1234})
@patch('swh.web.ui.views.api.service')
@istest
def api_origin_by_type_url(self, mock_service):
# given
stub_origin = self.origin1.copy()
stub_origin.update({
'id': 987
})
mock_service.lookup_origin.return_value = stub_origin
expected_origin = stub_origin.copy()
expected_origin.update({
'origin_visits_url': '/api/1/origin/987/visits/'
})
# when
rv = self.app.get('/api/1/origin/ftp/url/ftp://some/url/to/origin/0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_origin)
mock_service.lookup_origin.assert_called_with(
{'url': 'ftp://some/url/to/origin/0',
'type': 'ftp'})
@patch('swh.web.ui.views.api.service')
@istest
def api_origin_not_found(self, mock_service):
# given
mock_service.lookup_origin.return_value = None
# when
rv = self.app.get('/api/1/origin/4321/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Origin with id 4321 not found.'
})
mock_service.lookup_origin.assert_called_with({'id': 4321})
@patch('swh.web.ui.views.api.service')
@istest
def api_release(self, mock_service):
# given
stub_release = {
'id': 'release-0',
'target_type': 'revision',
'target': 'revision-sha1',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
expected_release = {
'id': 'release-0',
'target_type': 'revision',
'target': 'revision-sha1',
'target_url': '/api/1/revision/revision-sha1/',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
mock_service.lookup_release.return_value = stub_release
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_release)
mock_service.lookup_release.assert_called_once_with('release-0')
@patch('swh.web.ui.views.api.service')
@istest
def api_release_target_type_not_a_revision(self, mock_service):
# given
stub_release = {
'id': 'release-0',
'target_type': 'other-stuff',
'target': 'other-stuff-checksum',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
expected_release = {
'id': 'release-0',
'target_type': 'other-stuff',
'target': 'other-stuff-checksum',
"date": "Mon, 10 Mar 1997 08:00:00 GMT",
"synthetic": True,
'author': {
'name': 'author release name',
'email': 'author@email',
},
}
mock_service.lookup_release.return_value = stub_release
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_release)
mock_service.lookup_release.assert_called_once_with('release-0')
@patch('swh.web.ui.views.api.service')
@istest
def api_release_not_found(self, mock_service):
# given
mock_service.lookup_release.return_value = None
# when
rv = self.app.get('/api/1/release/release-0/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Release with sha1_git release-0 not found.'
})
@patch('swh.web.ui.views.api.service')
@istest
def api_revision(self, mock_service):
# given
stub_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
}
mock_service.lookup_revision.return_value = stub_revision
expected_revision = {
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/',
'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233e'
'ff7371d5/log/',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6'
'a42b7e2a44e6/',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': [
'8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'
],
'parent_urls': [
'/api/1/revision/8734ef7e7c357ce2af928115c6c6a42b7e2a44e7'
'/prev/18d8be353ed3480476f032475e7c233eff7371d5/'
],
'type': 'tar',
'synthetic': True,
'metadata': {
'original_artifact': [{
'archive_type': 'tar',
'name': 'webbase-5.7.0.tar.gz',
'sha1': '147f73f369733d088b7a6fa9c4e0273dcd3c7ccd',
'sha1_git': '6a15ea8b881069adedf11feceec35588f2cfe8f1',
'sha256': '401d0df797110bea805d358b85bcc1ced29549d3d73f'
'309d36484e7edf7bb912'
}]
},
}
# when
rv = self.app.get('/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_revision)
mock_service.lookup_revision.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_not_found(self, mock_service):
# given
mock_service.lookup_revision.return_value = None
# when
rv = self.app.get('/api/1/revision/revision-0/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Revision with sha1_git revision-0 not found.'})
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_raw_ok(self, mock_service):
# given
stub_revision = {'message': 'synthetic revision message'}
mock_service.lookup_revision_message.return_value = stub_revision
# when
rv = self.app.get('/api/1/revision/18d8be353ed3480476f032475e7c2'
'33eff7371d5/raw/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/octet-stream')
self.assertEquals(rv.data, b'synthetic revision message')
mock_service.lookup_revision_message.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_raw_ok_no_msg(self, mock_service):
# given
mock_service.lookup_revision_message.side_effect = NotFoundExc(
'No message for revision')
# when
rv = self.app.get('/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/raw/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'No message for revision'})
self.assertEquals
mock_service.lookup_revision_message.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_raw_ko_no_rev(self, mock_service):
# given
mock_service.lookup_revision_message.side_effect = NotFoundExc(
'No revision found')
# when
rv = self.app.get('/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/raw/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'No revision found'})
mock_service.lookup_revision_message.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_with_origin_not_found(self, mock_service):
mock_service.lookup_revision_by.return_value = None
rv = self.app.get('/api/1/revision/origin/123/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertIn('Revision with (origin_id: 123', response_data['error'])
self.assertIn('not found', response_data['error'])
mock_service.lookup_revision_by.assert_called_once_with(
123,
'refs/heads/master',
None)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_with_origin(self, mock_service):
mock_revision = {
'id': '32',
'directory': '21',
'message': 'message 1',
'type': 'deb',
}
expected_revision = {
'id': '32',
'url': '/api/1/revision/32/',
'history_url': '/api/1/revision/32/log/',
'directory': '21',
'directory_url': '/api/1/directory/21/',
'message': 'message 1',
'type': 'deb',
}
mock_service.lookup_revision_by.return_value = mock_revision
rv = self.app.get('/api/1/revision/origin/1/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/heads/master',
None)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_with_origin_and_branch_name(self, mock_service):
mock_revision = {
'id': '12',
'directory': '23',
'message': 'message 2',
'type': 'tar',
}
mock_service.lookup_revision_by.return_value = mock_revision
expected_revision = {
'id': '12',
'url': '/api/1/revision/12/',
'history_url': '/api/1/revision/12/log/',
'directory': '23',
'directory_url': '/api/1/directory/23/',
'message': 'message 2',
'type': 'tar',
}
rv = self.app.get('/api/1/revision/origin/1/branch/refs/origin/dev/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/origin/dev',
None)
@patch('swh.web.ui.views.api.service')
@patch('swh.web.ui.views.api.utils')
@istest
def api_revision_with_origin_and_branch_name_and_timestamp(self,
mock_utils,
mock_service):
mock_revision = {
'id': '123',
'directory': '456',
'message': 'message 3',
'type': 'tar',
}
mock_service.lookup_revision_by.return_value = mock_revision
expected_revision = {
'id': '123',
'url': '/api/1/revision/123/',
'history_url': '/api/1/revision/123/log/',
'directory': '456',
'directory_url': '/api/1/directory/456/',
'message': 'message 3',
'type': 'tar',
}
mock_utils.parse_timestamp.return_value = 'parsed-date'
mock_utils.enrich_revision.return_value = expected_revision
rv = self.app.get('/api/1/revision'
'/origin/1'
'/branch/refs/origin/dev'
'/ts/1452591542/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/origin/dev',
'parsed-date')
mock_utils.parse_timestamp.assert_called_once_with('1452591542')
mock_utils.enrich_revision.assert_called_once_with(
mock_revision)
@patch('swh.web.ui.views.api.service')
@patch('swh.web.ui.views.api.utils')
@istest
def api_revision_with_origin_and_branch_name_and_timestamp_with_escapes(
self,
mock_utils,
mock_service):
mock_revision = {
'id': '999',
}
mock_service.lookup_revision_by.return_value = mock_revision
expected_revision = {
'id': '999',
'url': '/api/1/revision/999/',
'history_url': '/api/1/revision/999/log/',
}
mock_utils.parse_timestamp.return_value = 'parsed-date'
mock_utils.enrich_revision.return_value = expected_revision
rv = self.app.get('/api/1/revision'
'/origin/1'
'/branch/refs%2Forigin%2Fdev'
'/ts/Today%20is%20'
'January%201,%202047%20at%208:21:00AM/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_revision)
mock_service.lookup_revision_by.assert_called_once_with(
1,
'refs/origin/dev',
'parsed-date')
mock_utils.parse_timestamp.assert_called_once_with(
'Today is January 1, 2047 at 8:21:00AM')
mock_utils.enrich_revision.assert_called_once_with(
mock_revision)
@patch('swh.web.ui.views.api.service')
@istest
def revision_directory_by_ko_raise(self, mock_service):
# given
mock_service.lookup_directory_through_revision.side_effect = NotFoundExc('not') # noqa
# when
with self.assertRaises(NotFoundExc):
api._revision_directory_by(
{'sha1_git': 'id'},
None,
'/api/1/revision/sha1/directory/')
# then
mock_service.lookup_directory_through_revision.assert_called_once_with(
{'sha1_git': 'id'},
None, limit=100, with_data=False)
@patch('swh.web.ui.views.api.service')
@istest
def revision_directory_by_type_dir(self, mock_service):
# given
mock_service.lookup_directory_through_revision.return_value = (
'rev-id',
{
'type': 'dir',
'revision': 'rev-id',
'path': 'some/path',
'content': []
})
# when
actual_dir_content = api._revision_directory_by(
{'sha1_git': 'blah-id'},
'some/path', '/api/1/revision/sha1/directory/')
# then
self.assertEquals(actual_dir_content, {
'type': 'dir',
'revision': 'rev-id',
'path': 'some/path',
'content': []
})
mock_service.lookup_directory_through_revision.assert_called_once_with(
{'sha1_git': 'blah-id'},
'some/path', limit=100, with_data=False)
@patch('swh.web.ui.views.api.service')
@istest
def revision_directory_by_type_file(self, mock_service):
# given
mock_service.lookup_directory_through_revision.return_value = (
'rev-id',
{
'type': 'file',
'revision': 'rev-id',
'path': 'some/path',
'content': {'blah': 'blah'}
})
# when
actual_dir_content = api._revision_directory_by(
{'sha1_git': 'sha1'},
'some/path',
'/api/1/revision/origin/2/directory/',
limit=1000, with_data=True)
# then
self.assertEquals(actual_dir_content, {
'type': 'file',
'revision': 'rev-id',
'path': 'some/path',
'content': {'blah': 'blah'}
})
mock_service.lookup_directory_through_revision.assert_called_once_with(
{'sha1_git': 'sha1'},
'some/path', limit=1000, with_data=True)
@patch('swh.web.ui.views.api.utils')
@patch('swh.web.ui.views.api._revision_directory_by')
@istest
def api_directory_through_revision_origin_ko_not_found(self,
mock_rev_dir,
mock_utils):
mock_rev_dir.side_effect = NotFoundExc('not found')
mock_utils.parse_timestamp.return_value = '2012-10-20 00:00:00'
rv = self.app.get('/api/1/revision'
'/origin/10'
'/branch/refs/remote/origin/dev'
'/ts/2012-10-20'
'/directory/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, {
'error': 'not found'})
mock_rev_dir.assert_called_once_with(
{'origin_id': 10,
'branch_name': 'refs/remote/origin/dev',
'ts': '2012-10-20 00:00:00'}, None,
'/api/1/revision'
'/origin/10'
'/branch/refs/remote/origin/dev'
'/ts/2012-10-20'
'/directory/',
with_data=False)
@patch('swh.web.ui.views.api._revision_directory_by')
@istest
def api_directory_through_revision_origin(self,
mock_revision_dir):
expected_res = [{
'id': '123'
}]
mock_revision_dir.return_value = expected_res
rv = self.app.get('/api/1/revision/origin/3/directory/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEqual(response_data, expected_res)
mock_revision_dir.assert_called_once_with({
'origin_id': 3,
'branch_name': 'refs/heads/master',
'ts': None}, None, '/api/1/revision/origin/3/directory/',
with_data=False)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log(self, mock_service):
# given
stub_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
'type': 'tar',
'synthetic': True,
}]
mock_service.lookup_revision_log.return_value = stub_revisions
expected_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/',
'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef'
'f7371d5/log/',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a'
'42b7e2a44e6/',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': [
'7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
],
'parent_urls': [
'/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
'/prev/18d8be353ed3480476f032475e7c233eff7371d5/'
],
'type': 'tar',
'synthetic': True,
}]
expected_response = {
'revisions': expected_revisions,
'next_revs_url': None
}
# when
rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42'
'b7e2a44e6/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_response)
mock_service.lookup_revision_log.assert_called_once_with(
'8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_with_next(self, mock_service):
# given
stub_revisions = []
for i in range(27):
stub_revisions.append({'id': i})
mock_service.lookup_revision_log.return_value = stub_revisions[:26]
expected_revisions = [x for x in stub_revisions if x['id'] < 25]
for e in expected_revisions:
e['url'] = '/api/1/revision/%s/' % e['id']
e['history_url'] = '/api/1/revision/%s/log/' % e['id']
expected_response = {
'revisions': expected_revisions,
'next_revs_url': '/api/1/revision/25/log/'
}
# when
rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42'
'b7e2a44e6/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_response)
mock_service.lookup_revision_log.assert_called_once_with(
'8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_not_found(self, mock_service):
# given
mock_service.lookup_revision_log.return_value = None
# when
rv = self.app.get('/api/1/revision/8834ef7e7c357ce2af928115c6c6a42b7'
'e2a44e6/log/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Revision with sha1_git'
' 8834ef7e7c357ce2af928115c6c6a42b7e2a44e6 not found.'})
mock_service.lookup_revision_log.assert_called_once_with(
'8834ef7e7c357ce2af928115c6c6a42b7e2a44e6', 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_context(self, mock_service):
# given
stub_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
'type': 'tar',
'synthetic': True,
}]
mock_service.lookup_revision_log.return_value = stub_revisions
mock_service.lookup_revision_multiple.return_value = [{
'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory': '18d8be353ed3480476f032475e7c233eff7371d5',
'author_name': 'Name Surname',
'author_email': 'name@surname.com',
'committer_name': 'Name Surname',
'committer_email': 'name@surname.com',
'message': 'amazing revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'],
'type': 'tar',
'synthetic': True,
}]
expected_revisions = [
{
'url': '/api/1/revision/'
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/',
'history_url': '/api/1/revision/'
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/log/',
'id': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory_url': '/api/1/directory/'
'18d8be353ed3480476f032475e7c233eff7371d5/',
'author_name': 'Name Surname',
'author_email': 'name@surname.com',
'committer_name': 'Name Surname',
'committer_email': 'name@surname.com',
'message': 'amazing revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'],
'parent_urls': [
'/api/1/revision/adc83b19e793491b1c6ea0fd8b46cd9f32e592fc'
'/prev/7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/'
],
'type': 'tar',
'synthetic': True,
},
{
'url': '/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/',
'history_url': '/api/1/revision/'
'18d8be353ed3480476f032475e7c233eff7371d5/log/',
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory_url': '/api/1/directory/'
'7834ef7e7c357ce2af928115c6c6a42b7e2a44e6/',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
'parent_urls': [
'/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
'/prev/18d8be353ed3480476f032475e7c233eff7371d5/'
],
'type': 'tar',
'synthetic': True,
}]
expected_response = {
'revisions': expected_revisions,
'next_revs_url': None
}
# when
rv = self.app.get('/api/1/revision/18d8be353ed3480476f0'
'32475e7c233eff7371d5/prev/prev-rev/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_response)
mock_service.lookup_revision_log.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5', 26)
mock_service.lookup_revision_multiple.assert_called_once_with(
['prev-rev'])
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_by(self, mock_service):
# given
stub_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': ['7834ef7e7c357ce2af928115c6c6a42b7e2a4345'],
'type': 'tar',
'synthetic': True,
}]
mock_service.lookup_revision_log_by.return_value = stub_revisions
expected_revisions = [{
'id': '18d8be353ed3480476f032475e7c233eff7371d5',
'url': '/api/1/revision/18d8be353ed3480476f032475e7c233eff7371d5/',
'history_url': '/api/1/revision/18d8be353ed3480476f032475e7c233ef'
'f7371d5/log/',
'directory': '7834ef7e7c357ce2af928115c6c6a42b7e2a44e6',
'directory_url': '/api/1/directory/7834ef7e7c357ce2af928115c6c6a'
'42b7e2a44e6/',
'author_name': 'Software Heritage',
'author_email': 'robot@softwareheritage.org',
'committer_name': 'Software Heritage',
'committer_email': 'robot@softwareheritage.org',
'message': 'synthetic revision message',
'date_offset': 0,
'committer_date_offset': 0,
'parents': [
'7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
],
'parent_urls': [
'/api/1/revision/7834ef7e7c357ce2af928115c6c6a42b7e2a4345'
'/prev/18d8be353ed3480476f032475e7c233eff7371d5/'
],
'type': 'tar',
'synthetic': True,
}]
expected_result = {
'revisions': expected_revisions,
'next_revs_url': None
}
# when
rv = self.app.get('/api/1/revision/origin/1/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_result)
mock_service.lookup_revision_log_by.assert_called_once_with(
1, 'refs/heads/master', None, 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_by_with_next(self, mock_service):
# given
stub_revisions = []
for i in range(27):
stub_revisions.append({'id': i})
mock_service.lookup_revision_log_by.return_value = stub_revisions[:26]
expected_revisions = [x for x in stub_revisions if x['id'] < 25]
for e in expected_revisions:
e['url'] = '/api/1/revision/%s/' % e['id']
e['history_url'] = '/api/1/revision/%s/log/' % e['id']
expected_response = {
'revisions': expected_revisions,
'next_revs_url': '/api/1/revision/25/log/'
}
# when
rv = self.app.get('/api/1/revision/origin/1/log/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_response)
mock_service.lookup_revision_log_by.assert_called_once_with(
1, 'refs/heads/master', None, 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_log_by_norev(self, mock_service):
# given
mock_service.lookup_revision_log_by.side_effect = NotFoundExc(
'No revision')
# when
rv = self.app.get('/api/1/revision/origin/1/log/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {'error': 'No revision'})
mock_service.lookup_revision_log_by.assert_called_once_with(
1, 'refs/heads/master', None, 26)
@patch('swh.web.ui.views.api.service')
@istest
def api_revision_history(self, mock_service):
# for readability purposes, we use:
# - sha1 as 3 letters (url are way too long otherwise to respect pep8)
# - only keys with modification steps (all other keys are kept as is)
# given
stub_revision = {
'id': '883',
'children': ['777', '999'],
'parents': [],
'directory': '272'
}
mock_service.lookup_revision.return_value = stub_revision
# then
rv = self.app.get('/api/1/revision/883/prev/999/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'id': '883',
'url': '/api/1/revision/883/',
'history_url': '/api/1/revision/883/log/',
'history_context_url': '/api/1/revision/883/prev/999/log/',
'children': ['777', '999'],
'children_urls': ['/api/1/revision/777/',
'/api/1/revision/999/'],
'parents': [],
'parent_urls': [],
'directory': '272',
'directory_url': '/api/1/directory/272/'
})
mock_service.lookup_revision.assert_called_once_with('883')
@patch('swh.web.ui.views.api._revision_directory_by')
@istest
def api_revision_directory_ko_not_found(self, mock_rev_dir):
# given
mock_rev_dir.side_effect = NotFoundExc('Not found')
# then
rv = self.app.get('/api/1/revision/999/directory/some/path/to/dir/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Not found'})
mock_rev_dir.assert_called_once_with(
{'sha1_git': '999'},
'some/path/to/dir',
'/api/1/revision/999/directory/some/path/to/dir/',
with_data=False)
@patch('swh.web.ui.views.api._revision_directory_by')
@istest
def api_revision_directory_ok_returns_dir_entries(self, mock_rev_dir):
stub_dir = {
'type': 'dir',
'revision': '999',
'content': [
{
'sha1_git': '789',
'type': 'file',
'target': '101',
'target_url': '/api/1/content/sha1_git:101/',
'name': 'somefile',
'file_url': '/api/1/revision/999/directory/some/path/'
'somefile/'
},
{
'sha1_git': '123',
'type': 'dir',
'target': '456',
'target_url': '/api/1/directory/456/',
'name': 'to-subdir',
'dir_url': '/api/1/revision/999/directory/some/path/'
'to-subdir/',
}]
}
# given
mock_rev_dir.return_value = stub_dir
# then
rv = self.app.get('/api/1/revision/999/directory/some/path/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_dir)
mock_rev_dir.assert_called_once_with(
{'sha1_git': '999'},
'some/path',
'/api/1/revision/999/directory/some/path/',
with_data=False)
@patch('swh.web.ui.views.api._revision_directory_by')
@istest
def api_revision_directory_ok_returns_content(self, mock_rev_dir):
stub_content = {
'type': 'file',
'revision': '999',
'content': {
'sha1_git': '789',
'sha1': '101',
'data_url': '/api/1/content/101/raw/',
}
}
# given
mock_rev_dir.return_value = stub_content
# then
url = '/api/1/revision/666/directory/some/other/path/'
rv = self.app.get(url)
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_content)
mock_rev_dir.assert_called_once_with(
{'sha1_git': '666'}, 'some/other/path', url, with_data=False)
@patch('swh.web.ui.views.api.service')
@istest
def api_person(self, mock_service):
# given
stub_person = {
'id': '198003',
'name': 'Software Heritage',
'email': 'robot@softwareheritage.org',
}
mock_service.lookup_person.return_value = stub_person
# when
rv = self.app.get('/api/1/person/198003/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, stub_person)
@patch('swh.web.ui.views.api.service')
@istest
def api_person_not_found(self, mock_service):
# given
mock_service.lookup_person.return_value = None
# when
rv = self.app.get('/api/1/person/666/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Person with id 666 not found.'})
@patch('swh.web.ui.views.api.service')
@istest
def api_directory(self, mock_service):
# given
stub_directories = [
{
'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5',
'type': 'file',
'target': '4568be353ed3480476f032475e7c233eff737123',
},
{
'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737',
'type': 'dir',
'target': '8be353ed3480476f032475e7c233eff737123456',
}]
expected_directories = [
{
'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5',
'type': 'file',
'target': '4568be353ed3480476f032475e7c233eff737123',
'target_url': '/api/1/content/'
'sha1_git:4568be353ed3480476f032475e7c233eff737123/',
},
{
'sha1_git': '1d518d8be353ed3480476f032475e7c233eff737',
'type': 'dir',
'target': '8be353ed3480476f032475e7c233eff737123456',
'target_url':
'/api/1/directory/8be353ed3480476f032475e7c233eff737123456/',
}]
mock_service.lookup_directory.return_value = stub_directories
# when
rv = self.app.get('/api/1/directory/'
'18d8be353ed3480476f032475e7c233eff7371d5/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_directories)
mock_service.lookup_directory.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5')
@patch('swh.web.ui.views.api.service')
@istest
def api_directory_not_found(self, mock_service):
# given
mock_service.lookup_directory.return_value = []
# when
rv = self.app.get('/api/1/directory/'
'66618d8be353ed3480476f032475e7c233eff737/')
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'Directory with sha1_git '
'66618d8be353ed3480476f032475e7c233eff737 not found.'})
@patch('swh.web.ui.views.api.service')
@istest
def api_directory_with_path_found(self, mock_service):
# given
expected_dir = {
'sha1_git': '18d8be353ed3480476f032475e7c233eff7371d5',
'type': 'file',
'name': 'bla',
'target': '4568be353ed3480476f032475e7c233eff737123',
'target_url': '/api/1/content/'
'sha1_git:4568be353ed3480476f032475e7c233eff737123/',
}
mock_service.lookup_directory_with_path.return_value = expected_dir
# when
rv = self.app.get('/api/1/directory/'
'18d8be353ed3480476f032475e7c233eff7371d5/bla/')
# then
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_dir)
mock_service.lookup_directory_with_path.assert_called_once_with(
'18d8be353ed3480476f032475e7c233eff7371d5', 'bla')
@patch('swh.web.ui.views.api.service')
@istest
def api_directory_with_path_not_found(self, mock_service):
# given
mock_service.lookup_directory_with_path.return_value = None
path = 'some/path/to/dir/'
# when
rv = self.app.get(('/api/1/directory/'
'66618d8be353ed3480476f032475e7c233eff737/%s')
% path)
path = path.strip('/') # Path stripped of lead/trail separators
# then
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': (('Entry with path %s relative to '
'directory with sha1_git '
'66618d8be353ed3480476f032475e7c233eff737 not found.')
% path)})
@patch('swh.web.ui.views.api.service')
@istest
def api_lookup_entity_by_uuid_not_found(self, mock_service):
# when
mock_service.lookup_entity_by_uuid.return_value = []
# when
rv = self.app.get('/api/1/entity/'
'5f4d4c51-498a-4e28-88b3-b3e4e8396cba/')
self.assertEquals(rv.status_code, 404)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error':
"Entity with uuid '5f4d4c51-498a-4e28-88b3-b3e4e8396cba' not " +
"found."})
mock_service.lookup_entity_by_uuid.assert_called_once_with(
'5f4d4c51-498a-4e28-88b3-b3e4e8396cba')
@patch('swh.web.ui.views.api.service')
@istest
def api_lookup_entity_by_uuid_bad_request(self, mock_service):
# when
mock_service.lookup_entity_by_uuid.side_effect = BadInputExc(
'bad input: uuid malformed!')
# when
rv = self.app.get('/api/1/entity/uuid malformed/')
self.assertEquals(rv.status_code, 400)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, {
'error': 'bad input: uuid malformed!'})
mock_service.lookup_entity_by_uuid.assert_called_once_with(
'uuid malformed')
@patch('swh.web.ui.views.api.service')
@istest
def api_lookup_entity_by_uuid(self, mock_service):
# when
stub_entities = [
{
'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4',
'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'
},
{
'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2'
}
]
mock_service.lookup_entity_by_uuid.return_value = stub_entities
expected_entities = [
{
'uuid': '34bd6b1b-463f-43e5-a697-785107f598e4',
'uuid_url': '/api/1/entity/34bd6b1b-463f-43e5-a697-'
'785107f598e4/',
'parent': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2',
'parent_url': '/api/1/entity/aee991a0-f8d7-4295-a201-'
'd1ce2efc9fb2/'
},
{
'uuid': 'aee991a0-f8d7-4295-a201-d1ce2efc9fb2',
'uuid_url': '/api/1/entity/aee991a0-f8d7-4295-a201-'
'd1ce2efc9fb2/'
}
]
# when
rv = self.app.get('/api/1/entity'
'/34bd6b1b-463f-43e5-a697-785107f598e4/')
self.assertEquals(rv.status_code, 200)
self.assertEquals(rv.mimetype, 'application/json')
response_data = json.loads(rv.data.decode('utf-8'))
self.assertEquals(response_data, expected_entities)
mock_service.lookup_entity_by_uuid.assert_called_once_with(
'34bd6b1b-463f-43e5-a697-785107f598e4')
class ApiUtils(unittest.TestCase):
@istest
def api_lookup_not_found(self):
# when
with self.assertRaises(exc.NotFoundExc) as e:
api._api_lookup('something',
lambda x: None,
'this is the error message raised as it is None')
self.assertEqual(e.exception.args[0],
'this is the error message raised as it is None')
@istest
def api_lookup_with_result(self):
# when
actual_result = api._api_lookup('something',
lambda x: x + '!',
'this is the error which won\'t be '
'used here')
self.assertEqual(actual_result, 'something!')
@istest
def api_lookup_with_result_as_map(self):
# when
actual_result = api._api_lookup([1, 2, 3],
lambda x: map(lambda y: y+1, x),
'this is the error which won\'t be '
'used here')
self.assertEqual(actual_result, [2, 3, 4])
diff --git a/swh/web/ui/utils.py b/swh/web/ui/utils.py
index 5a07084a..c82a13f0 100644
--- a/swh/web/ui/utils.py
+++ b/swh/web/ui/utils.py
@@ -1,382 +1,400 @@
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
import datetime
import flask
import re
from dateutil import parser
+def next_page(function_name, q, page):
+ """Compute the next page for function_name with parameter q at page.
+
+ """
+ url = flask.url_for(function_name, q=q)
+ url = '%s?page=%s' % (url, page + 1)
+ return url
+
+
+def prev_page(function_name, q, page):
+ """Compute the previous page for function_name with parameter q at page.
+
+ """
+ url = flask.url_for(function_name, q=q)
+ url = '%s?page=%s' % (url, page - 1)
+ return url
+
+
def filter_endpoints(url_map, prefix_url_rule, blacklist=[]):
"""Filter endpoints by prefix url rule.
Args:
- url_map: Url Werkzeug.Map of rules
- prefix_url_rule: prefix url string
- blacklist: blacklist of some url
Returns:
Dictionary of url_rule with values methods and endpoint.
The key is the url, the associated value is a dictionary of
'methods' (possible http methods) and 'endpoint' (python function)
"""
out = {}
for r in url_map:
rule = r['rule']
if rule == prefix_url_rule or rule in blacklist:
continue
if rule.startswith(prefix_url_rule):
out[rule] = {'methods': sorted(map(str, r['methods'])),
'endpoint': r['endpoint']}
return out
def fmap(f, data):
"""Map f to data at each level.
This must keep the origin data structure type:
- map -> map
- dict -> dict
- list -> list
- None -> None
Args:
f: function that expects one argument.
data: data to traverse to apply the f function.
list, map, dict or bare value.
Returns:
The same data-structure with modified values by the f function.
"""
if not data:
return data
if isinstance(data, map):
return map(lambda y: fmap(f, y), (x for x in data))
if isinstance(data, list):
return [fmap(f, x) for x in data]
if isinstance(data, dict):
return {k: fmap(f, v) for (k, v) in data.items()}
return f(data)
def prepare_data_for_view(data, encoding='utf-8'):
def prepare_data(s):
# Note: can only be 'data' key with bytes of raw content
if isinstance(s, bytes):
try:
return s.decode(encoding)
except:
return "Cannot decode the data bytes, try and set another " \
"encoding in the url (e.g. ?encoding=utf8) or " \
"download directly the " \
"content's raw data."
if isinstance(s, str):
return re.sub(r'/api/1/', r'/browse/', s)
return s
return fmap(prepare_data, data)
def filter_field_keys(data, field_keys):
"""Given an object instance (directory or list), and a csv field keys
to filter on.
Return the object instance with filtered keys.
Note: Returns obj as is if it's an instance of types not in (dictionary,
list)
Args:
- data: one object (dictionary, list...) to filter.
- field_keys: csv or set of keys to filter the object on
Returns:
obj filtered on field_keys
"""
if isinstance(data, map):
return (filter_field_keys(x, field_keys) for x in data)
if isinstance(data, list):
return [filter_field_keys(x, field_keys) for x in data]
if isinstance(data, dict):
return {k: v for (k, v) in data.items() if k in field_keys}
return data
def person_to_string(person):
"""Map a person (person, committer, tagger, etc...) to a string.
"""
return ''.join([person['name'], ' <', person['email'], '>'])
def parse_timestamp(timestamp):
"""Given a time or timestamp (as string), parse the result as datetime.
Returns:
a timezone-aware datetime representing the parsed value. If the parsed
value doesn't specify a timezone, UTC is assumed.
Samples:
- 2016-01-12
- 2016-01-12T09:19:12+0100
- Today is January 1, 2047 at 8:21:00AM
- 1452591542
"""
default_timestamp = datetime.datetime.utcfromtimestamp(0).replace(
tzinfo=datetime.timezone.utc)
try:
res = parser.parse(timestamp, ignoretz=False, fuzzy=True,
default=default_timestamp)
except:
res = datetime.datetime.utcfromtimestamp(float(timestamp)).replace(
tzinfo=datetime.timezone.utc)
return res
def enrich_object(object):
"""Enrich an object (revision, release) with link to the 'target' of
type 'target_type'.
Args:
object: An object with target and target_type keys
(e.g. release, revision)
Returns:
Object enriched with target_url pointing to the right
swh.web.ui.api urls for the pointing object (revision,
release, content, directory)
"""
obj = object.copy()
if 'target' in obj and 'target_type' in obj:
if obj['target_type'] == 'revision':
obj['target_url'] = flask.url_for('api_revision',
sha1_git=obj['target'])
elif obj['target_type'] == 'release':
obj['target_url'] = flask.url_for('api_release',
sha1_git=obj['target'])
elif obj['target_type'] == 'content':
obj['target_url'] = flask.url_for(
'api_content_metadata',
q='sha1_git:' + obj['target'])
elif obj['target_type'] == 'directory':
obj['target_url'] = flask.url_for('api_directory',
q=obj['target'])
return obj
enrich_release = enrich_object
def enrich_directory(directory, context_url=None):
"""Enrich directory with url to content or directory.
"""
if 'type' in directory:
target_type = directory['type']
target = directory['target']
if target_type == 'file':
directory['target_url'] = flask.url_for('api_content_metadata',
q='sha1_git:%s' % target)
if context_url:
directory['file_url'] = context_url + directory['name'] + '/'
else:
directory['target_url'] = flask.url_for('api_directory',
sha1_git=target)
if context_url:
directory['dir_url'] = context_url + directory['name'] + '/'
return directory
def enrich_metadata_endpoint(content):
"""Enrich metadata endpoint with link to the upper metadata endpoint.
"""
c = content.copy()
c['content_url'] = flask.url_for('api_content_metadata',
q='sha1:%s' % c['id'])
return c
def enrich_content(content):
"""Enrich content with links to:
- data_url: its raw data
- filetype_url: its filetype information
"""
for h in ['sha1', 'sha1_git', 'sha256']:
if h in content:
q = '%s:%s' % (h, content[h])
content['data_url'] = flask.url_for('api_content_raw', q=q)
content['filetype_url'] = flask.url_for('api_content_filetype',
q=q)
content['language_url'] = flask.url_for('api_content_language',
q=q)
content['license_url'] = flask.url_for('api_content_license',
q=q)
break
return content
def enrich_entity(entity):
"""Enrich entity with
"""
if 'uuid' in entity:
entity['uuid_url'] = flask.url_for('api_entity_by_uuid',
uuid=entity['uuid'])
if 'parent' in entity and entity['parent']:
entity['parent_url'] = flask.url_for('api_entity_by_uuid',
uuid=entity['parent'])
return entity
def _get_path_list(path_string):
"""Helper for enrich_revision: get a list of the sha1 id of the navigation
breadcrumbs, ordered from the oldest to the most recent.
Args:
path_string: the path as a '/'-separated string
Returns:
The navigation context as a list of sha1 revision ids
"""
return path_string.split('/')
def _get_revision_contexts(rev_id, context):
"""Helper for enrich_revision: retrieve for the revision id and potentially
the navigation breadcrumbs the context to pass to parents and children of
of the revision.
Args:
rev_id: the revision's sha1 id
context: the current navigation context
Returns:
The context for parents, children and the url of the direct child as a
tuple in that order.
"""
context_for_parents = None
context_for_children = None
url_direct_child = None
if not context:
return (rev_id, None, None)
path_list = _get_path_list(context)
context_for_parents = '%s/%s' % (context, rev_id)
prev_for_children = path_list[:-1]
if len(prev_for_children) > 0:
context_for_children = '/'.join(prev_for_children)
child_id = path_list[-1]
# This commit is not the first commit in the path
if context_for_children:
url_direct_child = flask.url_for(
'api_revision',
sha1_git=child_id,
context=context_for_children)
# This commit is the first commit in the path
else:
url_direct_child = flask.url_for(
'api_revision',
sha1_git=child_id)
return (context_for_parents, context_for_children, url_direct_child)
def _make_child_url(rev_children, context):
"""Helper for enrich_revision: retrieve the list of urls corresponding
to the children of the current revision according to the navigation
breadcrumbs.
Args:
rev_children: a list of revision id
context: the '/'-separated navigation breadcrumbs
Returns:
the list of the children urls according to the context
"""
children = []
for child in rev_children:
if context and child != _get_path_list(context)[-1]:
children.append(flask.url_for('api_revision', sha1_git=child))
elif not context:
children.append(flask.url_for('api_revision', sha1_git=child))
return children
def enrich_revision(revision, context=None):
"""Enrich revision with links where it makes sense (directory, parents).
Keep track of the navigation breadcrumbs if they are specified.
Args:
revision: the revision as a dict
context: the navigation breadcrumbs as a /-separated string of revision
sha1_git
"""
ctx_parents, ctx_children, url_direct_child = _get_revision_contexts(
revision['id'], context)
revision['url'] = flask.url_for('api_revision', sha1_git=revision['id'])
revision['history_url'] = flask.url_for('api_revision_log',
sha1_git=revision['id'])
if context:
revision['history_context_url'] = flask.url_for(
'api_revision_log',
sha1_git=revision['id'],
prev_sha1s=context)
if 'author' in revision:
author = revision['author']
revision['author_url'] = flask.url_for('api_person',
person_id=author['id'])
if 'committer' in revision:
committer = revision['committer']
revision['committer_url'] = flask.url_for('api_person',
person_id=committer['id'])
if 'directory' in revision:
revision['directory_url'] = flask.url_for(
'api_directory',
sha1_git=revision['directory'])
if 'parents' in revision:
parents = []
for parent in revision['parents']:
parents.append(flask.url_for('api_revision',
sha1_git=parent,
context=ctx_parents))
revision['parent_urls'] = parents
if 'children' in revision:
children = _make_child_url(revision['children'], context)
if url_direct_child:
children.append(url_direct_child)
revision['children_urls'] = children
else:
if url_direct_child:
revision['children_urls'] = [url_direct_child]
if 'message_decoding_failed' in revision:
revision['message_url'] = flask.url_for(
'api_revision_raw_message',
sha1_git=revision['id'])
return revision
diff --git a/swh/web/ui/views/api.py b/swh/web/ui/views/api.py
index 5db09b2b..20d79e5b 100644
--- a/swh/web/ui/views/api.py
+++ b/swh/web/ui/views/api.py
@@ -1,950 +1,975 @@
# Copyright (C) 2015-2016 The Software Heritage developers
# See the AUTHORS file at the top-level directory of this distribution
# License: GNU Affero General Public License version 3, or any later version
# See top-level LICENSE file for more information
from types import GeneratorType
from flask import request, url_for
from swh.web.ui import service, utils, apidoc as doc
from swh.web.ui.exc import NotFoundExc
from swh.web.ui.main import app
@app.route('/api/1/stat/counters/')
@doc.route('/api/1/stat/counters/', noargs=True)
@doc.returns(rettype=doc.rettypes.dict,
retdoc="A dictionary of SWH's most important statistics")
def api_stats():
"""Return statistics on SWH storage.
"""
return service.stat_counters()
@app.route('/api/1/origin//visits/')
@doc.route('/api/1/origin/visits/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='The requested SWH origin identifier')
@doc.returns(rettype=doc.rettypes.list,
retdoc="""All instances of visits of the origin pointed by
origin_id as POSIX time since epoch (if visit_id is not defined)
""")
def api_origin_visits(origin_id):
"""Return a list of origin visit (dict) for that particular origin
including date (visit date as posix timestamp), target,
target_type, status, ...
"""
def _enrich_origin_visit(origin_visit):
ov = origin_visit.copy()
ov['origin_visit_url'] = url_for('api_origin_visit',
origin_id=ov['origin'],
visit_id=ov['visit'])
return ov
return _api_lookup(
origin_id,
service.lookup_origin_visits,
error_msg_if_not_found='No origin %s found' % origin_id,
enrich_fn=_enrich_origin_visit)
@app.route('/api/1/origin//visits//')
@doc.route('/api/1/origin/visits/id/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc='The requested SWH origin identifier')
@doc.arg('visit_id',
default=1,
argtype=doc.argtypes.int,
argdoc='The requested SWH origin visit identifier')
@doc.returns(rettype=doc.rettypes.list,
retdoc="""The single instance visit visit_id of the origin pointed
by origin_id as POSIX time since epoch""")
def api_origin_visit(origin_id, visit_id):
"""Return origin visit (dict) for that particular origin including
(but not limited to) date (visit date as posix timestamp),
target, target_type, status, ...
"""
def _enrich_origin_visit(origin_visit):
ov = origin_visit.copy()
ov['origin_url'] = url_for('api_origin', origin_id=ov['origin'])
if 'occurrences' in ov:
ov['occurrences'] = {
k: utils.enrich_object(v)
for k, v in ov['occurrences'].items()
}
return ov
return _api_lookup(
origin_id,
service.lookup_origin_visit,
'No visit %s for origin %s found' % (visit_id, origin_id),
_enrich_origin_visit,
visit_id)
@app.route('/api/1/symbol/', methods=['POST'])
@app.route('/api/1/symbol//')
@doc.route('/api/1/symbol/search')
@doc.arg('q',
default='hello',
argtype=doc.argtypes.str,
argdoc="""An expression string to lookup in swh's raw content""")
@doc.returns(rettype=doc.rettypes.list,
retdoc="""A list of dict whose content matches the expression.
Each dict has the following keys:
- id (bytes): identifier of the content
- name (text): symbol whose content match the expression
- kind (text): kind of the symbol that matched
- lang (text): Language for that entry
- line (int): Number line for the symbol
""")
def api_fulltext_search(q=None):
"""Search a content per expression.
"""
+ result = {}
page = int(request.args.get('page', '1'))
- return _api_lookup(
+ symbols = _api_lookup(
q,
lookup_fn=lambda exp, page=page: service.lookup_expression(exp, page),
error_msg_if_not_found='No indexed raw content match expression \''
'%s\'.' % q,
enrich_fn=utils.enrich_content)
+ if symbols:
+ if page > 1:
+ result.update({
+ 'headers': {
+ 'link-next': utils.next_page('api_fulltext_search',
+ q, page),
+ 'link-prev': utils.prev_page('api_fulltext_search',
+ q, page),
+ }
+ })
+ else:
+ result.update({
+ 'headers': {
+ 'link-next': utils.next_page('api_fulltext_search',
+ q, page),
+ }
+ })
+
+ result.update({
+ 'results': symbols
+ })
+
+ return result
+
@app.route('/api/1/content/search/', methods=['POST'])
@app.route('/api/1/content/search//')
@doc.route('/api/1/content/search/')
@doc.arg('q',
default='sha1:adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc="""An algo_hash:hash string, where algo_hash is one of sha1,
sha1_git or sha256 and hash is the hash to search for in SWH""")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if q is not well formed')
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""A dict with keys:
- search_res: a list of dicts corresponding to queried content
with key 'found' to True if found, 'False' if not
- search_stats: a dict containing number of files searched and
percentage of files found
""")
def api_search(q=None):
"""Search a content per hash.
This may take the form of:
- a GET request with a single checksum
- a POST request with many hashes, with the request body containing
identifiers (typically filenames) as keys and corresponding hashes as
values.
"""
response = {'search_res': None,
'search_stats': None}
search_stats = {'nbfiles': 0, 'pct': 0}
search_res = None
# Single hash request route
if q:
r = service.search_hash(q)
search_res = [{'filename': None,
'sha1': q,
'found': r['found']}]
search_stats['nbfiles'] = 1
search_stats['pct'] = 100 if r['found'] else 0
# Post form submission with many hash requests
elif request.method == 'POST':
data = request.form
queries = []
# Remove potential inputs with no associated value
for k, v in data.items():
if v is not None:
if k == 'q' and len(v) > 0:
queries.append({'filename': None, 'sha1': v})
elif v != '':
queries.append({'filename': k, 'sha1': v})
if len(queries) > 0:
lookup = service.lookup_multiple_hashes(queries)
result = []
for el in lookup:
result.append({'filename': el['filename'],
'sha1': el['sha1'],
'found': el['found']})
search_res = result
nbfound = len([x for x in lookup if x['found']])
search_stats['nbfiles'] = len(queries)
search_stats['pct'] = (nbfound / len(queries))*100
response['search_res'] = search_res
response['search_stats'] = search_stats
return response
def _api_lookup(criteria,
lookup_fn,
error_msg_if_not_found,
enrich_fn=lambda x: x,
*args):
"""Capture a redundant behavior of:
- looking up the backend with a criteria (be it an identifier or checksum)
passed to the function lookup_fn
- if nothing is found, raise an NotFoundExc exception with error
message error_msg_if_not_found.
- Otherwise if something is returned:
- either as list, map or generator, map the enrich_fn function to it
and return the resulting data structure as list.
- either as dict and pass to enrich_fn and return the dict enriched.
Args:
- criteria: discriminating criteria to lookup
- lookup_fn: function expects one criteria and optional supplementary
*args.
- error_msg_if_not_found: if nothing matching the criteria is found,
raise NotFoundExc with this error message.
- enrich_fn: Function to use to enrich the result returned by
lookup_fn. Default to the identity function if not provided.
- *args: supplementary arguments to pass to lookup_fn.
Raises:
NotFoundExp or whatever `lookup_fn` raises.
"""
res = lookup_fn(criteria, *args)
if not res:
raise NotFoundExc(error_msg_if_not_found)
if isinstance(res, (map, list, GeneratorType)):
return [enrich_fn(x) for x in res]
return enrich_fn(res)
@app.route('/api/1/origin//')
@app.route('/api/1/origin//url//')
@doc.route('/api/1/origin/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The origin's SWH origin_id.")
@doc.arg('origin_type',
default='git',
argtype=doc.argtypes.str,
argdoc="The origin's type (git, svn..)")
@doc.arg('origin_url',
default='https://github.com/hylang/hy',
argtype=doc.argtypes.path,
argdoc="The origin's URL.")
@doc.raises(exc=doc.excs.notfound,
doc='Raised if origin_id does not correspond to an origin in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the origin identified by origin_id')
def api_origin(origin_id=None, origin_type=None, origin_url=None):
"""Return information about the origin matching the passed criteria.
Criteria may be:
- An SWH-specific ID, if you already know it
- An origin type and its URL, if you do not have the origin's SWH
identifier
"""
ori_dict = {
'id': origin_id,
'type': origin_type,
'url': origin_url
}
ori_dict = {k: v for k, v in ori_dict.items() if ori_dict[k]}
if 'id' in ori_dict:
error_msg = 'Origin with id %s not found.' % ori_dict['id']
else:
error_msg = 'Origin with type %s and URL %s not found' % (
ori_dict['type'], ori_dict['url'])
def _enrich_origin(origin):
if 'id' in origin:
o = origin.copy()
o['origin_visits_url'] = url_for('api_origin_visits',
origin_id=o['id'])
return o
return origin
return _api_lookup(
ori_dict, lookup_fn=service.lookup_origin,
error_msg_if_not_found=error_msg,
enrich_fn=_enrich_origin)
@app.route('/api/1/person//')
@doc.route('/api/1/person/')
@doc.arg('person_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The person's SWH identifier")
@doc.raises(exc=doc.excs.notfound,
doc='Raised if person_id does not correspond to an origin in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the person identified by person_id')
def api_person(person_id):
"""Return information about person with identifier person_id.
"""
return _api_lookup(
person_id, lookup_fn=service.lookup_person,
error_msg_if_not_found='Person with id %s not found.' % person_id)
@app.route('/api/1/release//')
@doc.route('/api/1/release/')
@doc.arg('sha1_git',
default='8b137891791fe96927ad78e64b0aad7bded08bdc',
argtype=doc.argtypes.sha1_git,
argdoc="The release's sha1_git identifier")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if the argument is not a sha1')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if sha1_git does not correspond to a release in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the release identified by sha1_git')
def api_release(sha1_git):
"""Return information about release with id sha1_git.
"""
error_msg = 'Release with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
lookup_fn=service.lookup_release,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_release)
def _revision_directory_by(revision, path, request_path,
limit=100, with_data=False):
"""Compute the revision matching criterion's directory or content data.
Args:
revision: dictionary of criterions representing a revision to lookup
path: directory's path to lookup
request_path: request path which holds the original context to
limit: optional query parameter to limit the revisions log
(default to 100). For now, note that this limit could impede the
transitivity conclusion about sha1_git not being an ancestor of
with_data: indicate to retrieve the content's raw data if path resolves
to a content.
"""
def enrich_directory_local(dir, context_url=request_path):
return utils.enrich_directory(dir, context_url)
rev_id, result = service.lookup_directory_through_revision(
revision, path, limit=limit, with_data=with_data)
content = result['content']
if result['type'] == 'dir': # dir_entries
result['content'] = list(map(enrich_directory_local, content))
else: # content
result['content'] = utils.enrich_content(content)
return result
@app.route('/api/1/revision'
'/origin/'
'/directory/')
@app.route('/api/1/revision'
'/origin/'
'/directory//')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/directory/')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/directory//')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/ts/'
'/directory/')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/ts/'
'/directory//')
@doc.route('/api/1/revision/origin/directory/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The revision's origin's SWH identifier")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""The optional branch for the given origin (default
to master""")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="""Optional timestamp (default to the nearest time
crawl of timestamp)""")
@doc.arg('path',
default='.',
argtype=doc.argtypes.path,
argdoc='The path to the directory or file to display')
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a revision matching the passed criteria was
not found""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the revision corresponding to the
passed criteria""")
def api_directory_through_revision_origin(origin_id,
branch_name="refs/heads/master",
ts=None,
path=None,
with_data=False):
"""Display directory or content information through a revision identified
by origin/branch/timestamp.
"""
if ts:
ts = utils.parse_timestamp(ts)
return _revision_directory_by(
{
'origin_id': origin_id,
'branch_name': branch_name,
'ts': ts
},
path,
request.path,
with_data=with_data)
@app.route('/api/1/revision'
'/origin//')
@app.route('/api/1/revision'
'/origin/'
'/branch//')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/ts//')
@app.route('/api/1/revision'
'/origin/'
'/ts//')
@doc.route('/api/1/revision/origin/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The queried revision's origin identifier in SWH")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="""The optional branch for the given origin (default
to master)""")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="The time at which the queried revision should be constrained")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a revision matching given criteria was not found
in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the revision identified by the given
criteria""")
def api_revision_with_origin(origin_id,
branch_name="refs/heads/master",
ts=None):
"""Display revision information through its identification by
origin/branch/timestamp.
"""
if ts:
ts = utils.parse_timestamp(ts)
return _api_lookup(
origin_id,
service.lookup_revision_by,
'Revision with (origin_id: %s, branch_name: %s'
', ts: %s) not found.' % (origin_id,
branch_name,
ts),
utils.enrich_revision,
branch_name,
ts)
@app.route('/api/1/revision//')
@app.route('/api/1/revision//prev//')
@doc.route('/api/1/revision/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The revision's sha1_git identifier")
@doc.arg('context',
default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a',
argtype=doc.argtypes.path,
argdoc='The navigation breadcrumbs -- use at your own risk')
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a revision matching sha1_git was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the revision identified by sha1_git')
def api_revision(sha1_git, context=None):
"""Return information about revision with id sha1_git.
"""
def _enrich_revision(revision, context=context):
return utils.enrich_revision(revision, context)
return _api_lookup(
sha1_git,
service.lookup_revision,
'Revision with sha1_git %s not found.' % sha1_git,
_enrich_revision)
@app.route('/api/1/revision//raw/')
@doc.route('/api/1/revision/raw/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The queried revision's sha1_git identifier")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a revision matching sha1_git was not found in SWH')
@doc.returns(rettype=doc.rettypes.octet_stream,
retdoc="""The message of the revision identified by sha1_git
as a downloadable octet stream""")
def api_revision_raw_message(sha1_git):
"""Return the raw data of the message of revision identified by sha1_git
"""
raw = service.lookup_revision_message(sha1_git)
return app.response_class(raw['message'],
headers={'Content-disposition': 'attachment;'
'filename=rev_%s_raw' % sha1_git},
mimetype='application/octet-stream')
@app.route('/api/1/revision//directory/')
@app.route('/api/1/revision//directory//')
@doc.route('/api/1/revision/directory/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc="The revision's sha1_git identifier.")
@doc.arg('dir_path',
default='.',
argtype=doc.argtypes.path,
argdoc='The path from the top level directory')
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a revision matching sha1_git was not found in SWH
, or if the path specified does not exist""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the directory pointed by revision id
sha1-git and dir_path""")
def api_revision_directory(sha1_git,
dir_path=None,
with_data=False):
"""Return information on directory pointed by revision with sha1_git.
If dir_path is not provided, display top level directory.
Otherwise, display the directory pointed by dir_path (if it exists).
"""
return _revision_directory_by(
{
'sha1_git': sha1_git
},
dir_path,
request.path,
with_data=with_data)
@app.route('/api/1/revision//log/')
@app.route('/api/1/revision//prev//log/')
@doc.route('/api/1/revision/log/')
@doc.arg('sha1_git',
default='ec72c666fb345ea5f21359b7bc063710ce558e39',
argtype=doc.argtypes.sha1_git,
argdoc='The sha1_git of the revision queried')
@doc.arg('prev_sha1s',
default='6adc4a22f20bbf3bbc754f1ec8c82be5dfb5c71a',
argtype=doc.argtypes.path,
argdoc='The navigation breadcrumbs -- use at your own risk!')
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git or prev_sha1s is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a revision matching sha1_git was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The log data starting at the revision identified by
sha1_git, completed with the navigation breadcrumbs,
if any""")
def api_revision_log(sha1_git, prev_sha1s=None):
"""Show all revisions (~git log) starting from sha1_git.
The first element returned is the given sha1_git, or the first
breadcrumb, if any.
"""
limit = app.config['conf']['max_log_revs']
response = {'revisions': None, 'next_revs_url': None}
revisions = None
next_revs_url = None
def lookup_revision_log_with_limit(s, limit=limit+1):
return service.lookup_revision_log(s, limit)
error_msg = 'Revision with sha1_git %s not found.' % sha1_git
rev_get = _api_lookup(sha1_git,
lookup_fn=lookup_revision_log_with_limit,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_revision)
if len(rev_get) == limit+1:
rev_backward = rev_get[:-1]
next_revs_url = url_for('api_revision_log',
sha1_git=rev_get[-1]['id'])
else:
rev_backward = rev_get
if not prev_sha1s: # no nav breadcrumbs, so we're done
revisions = rev_backward
else:
rev_forward_ids = prev_sha1s.split('/')
rev_forward = _api_lookup(rev_forward_ids,
lookup_fn=service.lookup_revision_multiple,
error_msg_if_not_found=error_msg,
enrich_fn=utils.enrich_revision)
revisions = rev_forward + rev_backward
response['revisions'] = revisions
response['next_revs_url'] = next_revs_url
return response
@app.route('/api/1/revision'
'/origin//log/')
@app.route('/api/1/revision'
'/origin/'
'/branch//log/')
@app.route('/api/1/revision'
'/origin/'
'/branch/'
'/ts//log/')
@app.route('/api/1/revision'
'/origin/'
'/ts//log/')
@doc.route('/api/1/revision/origin/log/')
@doc.arg('origin_id',
default=1,
argtype=doc.argtypes.int,
argdoc="The revision's SWH origin identifier")
@doc.arg('branch_name',
default='refs/heads/master',
argtype=doc.argtypes.path,
argdoc="The revision's branch name within the origin specified")
@doc.arg('ts',
default='2000-01-17T11:23:54+00:00',
argtype=doc.argtypes.ts,
argdoc="""A time or timestamp string to parse""")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a revision matching the given criteria was not
found in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the revision log starting at the revision
matching the given criteria.""")
def api_revision_log_by(origin_id,
branch_name='refs/heads/master',
ts=None):
"""Show all revisions (~git log) starting from the revision
described by its origin_id, optional branch name and timestamp.
The first element returned is the described revision.
"""
limit = app.config['conf']['max_log_revs']
response = {'revisions': None, 'next_revs_url': None}
next_revs_url = None
if ts:
ts = utils.parse_timestamp(ts)
def lookup_revision_log_by_with_limit(o_id, br, ts, limit=limit+1):
return service.lookup_revision_log_by(o_id, br, ts, limit)
error_msg = 'No revision matching origin %s ' % origin_id
error_msg += ', branch name %s' % branch_name
error_msg += (' and time stamp %s.' % ts) if ts else '.'
rev_get = _api_lookup(origin_id,
lookup_revision_log_by_with_limit,
error_msg,
utils.enrich_revision,
branch_name,
ts)
if len(rev_get) == limit+1:
revisions = rev_get[:-1]
next_revs_url = url_for('api_revision_log',
sha1_git=rev_get[-1]['id'])
else:
revisions = rev_get
response['revisions'] = revisions
response['next_revs_url'] = next_revs_url
return response
@app.route('/api/1/directory//')
@app.route('/api/1/directory///')
@doc.route('/api/1/directory/')
@doc.arg('sha1_git',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.sha1_git,
argdoc="The queried directory's corresponding sha1_git hash")
@doc.arg('path',
default='.',
argtype=doc.argtypes.path,
argdoc="A path relative to the queried directory's top level")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if sha1_git is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a directory matching sha1_git was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata and contents of the release identified by
sha1_git""")
def api_directory(sha1_git,
path=None):
"""Return information about release with id sha1_git.
"""
if path:
error_msg_path = ('Entry with path %s relative to directory '
'with sha1_git %s not found.') % (path, sha1_git)
return _api_lookup(
sha1_git,
service.lookup_directory_with_path,
error_msg_path,
utils.enrich_directory,
path)
else:
error_msg_nopath = 'Directory with sha1_git %s not found.' % sha1_git
return _api_lookup(
sha1_git,
service.lookup_directory,
error_msg_nopath,
utils.enrich_directory)
@app.route('/api/1/provenance//')
@doc.route('/api/1/provenance/')
@doc.arg('q',
default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242',
argtype=doc.argtypes.algo_and_hash,
argdoc="""The queried content's corresponding hash (supported hash
algorithms: sha1_git, sha1, sha256)""")
@doc.raises(exc=doc.excs.badinput,
doc="""Raised if hash algorithm is incorrect or if the hash
value is badly formatted.""")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a content matching the hash was not found
in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""List of provenance information (dict) for the matched
content.""")
def api_content_provenance(q):
"""Return content's provenance information if any.
"""
def _enrich_revision(provenance):
p = provenance.copy()
p['revision_url'] = url_for('api_revision',
sha1_git=provenance['revision'])
p['content_url'] = url_for('api_content_metadata',
q='sha1_git:%s' % provenance['content'])
p['origin_url'] = url_for('api_origin',
origin_id=provenance['origin'])
p['origin_visits_url'] = url_for('api_origin_visits',
origin_id=provenance['origin'])
p['origin_visit_url'] = url_for('api_origin_visit',
origin_id=provenance['origin'],
visit_id=provenance['visit'])
return p
return _api_lookup(
q,
lookup_fn=service.lookup_content_provenance,
error_msg_if_not_found='Content with %s not found.' % q,
enrich_fn=_enrich_revision)
@app.route('/api/1/filetype//')
@doc.route('/api/1/filetype/')
@doc.arg('q',
default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242',
argtype=doc.argtypes.algo_and_hash,
argdoc="""The queried content's corresponding hash (supported hash
algorithms: sha1_git, sha1, sha256)""")
@doc.raises(exc=doc.excs.badinput,
doc="""Raised if hash algorithm is incorrect or if the hash
value is badly formatted.""")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a content matching the hash was not found
in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""Filetype information (dict) for the matched
content.""")
def api_content_filetype(q):
"""Return content's filetype information if any.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_filetype,
error_msg_if_not_found='No filetype information found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/language//')
@doc.route('/api/1/language/')
@doc.arg('q',
default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242',
argtype=doc.argtypes.algo_and_hash,
argdoc="""The queried content's corresponding hash (supported hash
algorithms: sha1_git, sha1, sha256)""")
@doc.raises(exc=doc.excs.badinput,
doc="""Raised if hash algorithm is incorrect or if the hash
value is badly formatted.""")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a content matching the hash was not found
in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""Language information (dict) for the matched
content.""")
def api_content_language(q):
"""Return content's language information if any.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_language,
error_msg_if_not_found='No language information found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/license//')
@doc.route('/api/1/license/')
@doc.arg('q',
default='sha1_git:88b9b366facda0b5ff8d8640ee9279bed346f242',
argtype=doc.argtypes.algo_and_hash,
argdoc="""The queried content's corresponding hash (supported hash
algorithms: sha1_git, sha1, sha256)""")
@doc.raises(exc=doc.excs.badinput,
doc="""Raised if hash algorithm is incorrect or if the hash
value is badly formatted.""")
@doc.raises(exc=doc.excs.notfound,
doc="""Raised if a content matching the hash was not found
in SWH""")
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""License information (dict) for the matched
content.""")
def api_content_license(q):
"""Return content's license information if any.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content_license,
error_msg_if_not_found='No license information found '
'for content %s.' % q,
enrich_fn=utils.enrich_metadata_endpoint)
@app.route('/api/1/content//raw/')
@doc.route('/api/1/content/raw/')
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc="""An algo_hash:hash string, where algo_hash is one of sha1,
sha1_git or sha256 and hash is the hash to search for in SWH. Defaults
to sha1 in the case of a missing algo_hash
""")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if q is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a content matching q was not found in SWH')
@doc.returns(rettype=doc.rettypes.octet_stream,
retdoc='The raw content data as an octet stream')
def api_content_raw(q):
"""Return content's raw data if content is found.
"""
def generate(content):
yield content['data']
content = service.lookup_content_raw(q)
if not content:
raise NotFoundExc('Content with %s not found.' % q)
return app.response_class(generate(content),
headers={'Content-disposition': 'attachment;'
'filename=content_%s_raw' % q},
mimetype='application/octet-stream')
@app.route('/api/1/content//')
@doc.route('/api/1/content/')
@doc.arg('q',
default='adc83b19e793491b1c6ea0fd8b46cd9f32e592fc',
argtype=doc.argtypes.algo_and_hash,
argdoc="""An algo_hash:hash string, where algo_hash is one of sha1,
sha1_git or sha256 and hash is the hash to search for in SWH. Defaults
to sha1 in the case of a missing algo_hash
""")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if q is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if a content matching q was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc="""The metadata of the content identified by q. If content
decoding was successful, it also returns the data""")
def api_content_metadata(q):
"""Return content information if content is found.
"""
return _api_lookup(
q,
lookup_fn=service.lookup_content,
error_msg_if_not_found='Content with %s not found.' % q,
enrich_fn=utils.enrich_content)
@app.route('/api/1/entity//')
@doc.route('/api/1/entity/')
@doc.arg('uuid',
default='5f4d4c51-498a-4e28-88b3-b3e4e8396cba',
argtype=doc.argtypes.uuid,
argdoc="The entity's uuid identifier")
@doc.raises(exc=doc.excs.badinput,
doc='Raised if uuid is not well formed')
@doc.raises(exc=doc.excs.notfound,
doc='Raised if an entity matching uuid was not found in SWH')
@doc.returns(rettype=doc.rettypes.dict,
retdoc='The metadata of the entity identified by uuid')
def api_entity_by_uuid(uuid):
"""Return content information if content is found.
"""
return _api_lookup(
uuid,
lookup_fn=service.lookup_entity_by_uuid,
error_msg_if_not_found="Entity with uuid '%s' not found." % uuid,
enrich_fn=utils.enrich_entity)